Serverless Event Driven Systems with Confluent Cloud and AWS Lambda

Introduction

Here I present an end-to-end example of a Serverless event driven architecture using Confluent Cloud for stream processing paired with AWS Lambda for event responsive logic using the Serverless Application Model (SAM) framework. Together this architecture will compose a system for fictitious financial stock quote email alerting. The use case being demonstrated assumes that you already have an active Confluent Cloud Account as well as an AWS Account. Once the two accounts are ready to go the process can be described stepwise as folows.

1. Download and install the Confluent Cloud ccloud CLI
2. Create Kafka Cluster and User API Key with ccloud CLI
3. Creating a Kafka Topic for the Stock Data
4. Create Service Account and API Keys for Kafka Clients
5. Enable the Confluent Schema Registry in Confluent Cloud
6. Create ksqlDB Application using ccloud CLI
7. Launch Datagen Connector in Confluent Cloud for Stock Data Generation
8. Stream Processing with ksqlDB
9. Provisioning a Serverless Consumer Application in AWS Cloud
10. Launch AWS Lambda Sink Connector in Confluent Cloud

The flow of data among the components and environments of this system is depicted in the diagram below.

Systems Integration Diagram

All Cloud Service configuration and provisioning for this tutorial will be done through interactive CLI commands and Infrastructure as Code (IaC). I've found this is important for two reasons. First, cloud based service providers like AWS and Confluent have a history of continuously updating their UIs and thus screenshots of specific screens and settings often become inaccurate too quickly to be useful. Second, in practice it is much more realistic and valuable to use automated methods such as CLI based commands or IaC over clicking around in a UI.

Excited? Then lets get started!

Download and install the Confluent Cloud ccloud CLI

Confluent already has fantastic ccloud CLI installation docs so please refer to them if you've not yet installed it.

However, I do have a word of caution. Confluent Cloud is a relatively new service offering and Confluent is rapidly evolving it. I cannot guarentee that the CLI commands used will remain stable / backwards compatible forever but, I am reasonably sure it will be more stable than the UI (even though I must admit its a pretty slick UI ... hat tip to the Confluent frontend dev team).

I've listed the version of the ccloud CLI I'm using below.

ccloud --version

Version output.

ccloud version v1.32.0

Do make sure you've logged in with ccloud using your desired Confluent Cloud account credentials (email and password)

ccloud login --prompt

Create Kafka Cluster and User API Key with ccloud CLI

Confluent Cloud comes with a default environment but, for this tutorial I will use a fresh one. This way I can isolate the Kafka Cluster, ksqlDB application, service accounts and associated api-keys to just this demo which I can then throw away once the demo is done.

Create an environment.

ccloud environment create serverless-stocks-demo

Output will look similar to this but with different environment ID.

+------------------+------------------------+
| Environment Name | serverless-stocks-demo |
| Id               | env-g526n              |
+------------------+------------------------+

Set the ccloud CLI to use the newly created environment.

ccloud environment use env-g526n

Now I can create a basic Kafka Cluster named stocks-cluster in the AWS cloud within the us-east-2 region as shown below.

ccloud kafka cluster create stocks-cluster --cloud aws --region us-east-2 --type basic

This produces output metadata about the new cluster.

It may take up to 5 minutes for the Kafka cluster to be ready.
+--------------+---------------------------------------------------------+
| Id           | lkc-x88n1                                               |
| Name         | stocks-cluster                                          |
| Type         | BASIC                                                   |
| Ingress      |                                                     100 |
| Egress       |                                                     100 |
| Storage      |                                                    5000 |
| Provider     | aws                                                     |
| Availability | single-zone                                             |
| Region       | us-east-2                                               |
| Status       | UP                                                      |
| Endpoint     | SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 |
| ApiEndpoint  | https://pkac-4vnm0.us-east-2.aws.confluent.cloud        |
| RestEndpoint |                                                         |
+--------------+---------------------------------------------------------+

Next I use the Cluster ID of the newly created cluster to configure the ccloud CLI to use it as the default cluster for subsequent commands.

ccloud kafka cluster use lkc-x88n1

I need to make a user API Key for adminstrative tasks like creating ksqlDB applications for the cluster along with creating and configuring topics in the Kafka cluster itself.

ccloud api-key create --resource lkc-x88n1

The output should be similar to this.

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | AOGURDQHU4H4643L                                                 |
| Secret  | KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj |
+---------+------------------------------------------------------------------+

As the warning says, you will need to wait a couple of minutes before progressing as the API Keys take some time to provision. You should also store the secret somewhere safe (I'll be deleting mine along with this entire serverless-stocks-demo environment).

Creating a Kafka Topic for the Stock Data

This will be a pretty simple step where I demonstrate how to create a topic named stocks that shall eventually receive stock data from the Confluent Datagen Connector.

ccloud kafka topic create stocks --partitions 3 --cluster lkc-x88n1

Create Service Account and API Keys for Kafka Clients

In this section I demonstrate how to create a service account for any regular ole Kafka clients to produce data to or consume data from Kafka topics. While this isn't exactly required for this example use case I have found it to be a helpful exercise in understanding how service accounts are used with Access Control Lists (ACLs) and how to configure Kafka clients to properly authenticate with Confluent Cloud.

The following creates a service account.

ccloud service-account create stocks-cluster-sa --description "For client applications"

Output.

+-------------+-------------------------+
| Id          |                  244423 |
| Resource ID | sa-lg0ow1               |
| Name        | stocks-cluster-sa       |
| Description | For client applications |
+-------------+-------------------------+

Next I create a key/secret pair associated with the service account to be used for authentication in client applications for the previously created Kafka cluster.

ccloud api-key create --service-account 244423 --resource lkc-x88n1

Output.

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | M5TTN4C3IFCXJ7BE                                                 |
| Secret  | IlRwRJul7ds1JrE8lUiRYo+cXM2J7pxEqu/MsCf6ktETVTadmULrdrt35rAx4+YL |
+---------+------------------------------------------------------------------+

I then use the key/secret pair values along with the Kafka cluster endpoint to create a properties file for authenticating standard Kafka CLI Clients.

The Kafka Cluster endpoint can be retrieved like so.

ccloud kafka cluster describe lkc-x88n1

Output.

+--------------+---------------------------------------------------------+
| Id           | lkc-x88n1                                               |
| Name         | stocks-cluster                                          |
| Type         | BASIC                                                   |
| Ingress      |                                                     100 |
| Egress       |                                                     100 |
| Storage      |                                                    5000 |
| Provider     | aws                                                     |
| Availability | single-zone                                             |
| Region       | us-east-2                                               |
| Status       | UP                                                      |
| Endpoint     | SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 |
| ApiEndpoint  | https://pkac-4vnm0.us-east-2.aws.confluent.cloud        |
| RestEndpoint |                                                         |
+--------------+---------------------------------------------------------+

Then using this information I create a client-auth.properties file as shown below.

bootstrap.servers=pkc-ep9mm.us-east-2.aws.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="M5TTN4C3IFCXJ7BE" password="IlRwRJul7ds1JrE8lUiRYo+cXM2J7pxEqu/MsCf6ktETVTadmULrdrt35rAx4+YL";

However, before I can use those credentials and associated service account to write data to the stocks topic or consume data from it I have to assign it a set of ACLs.

ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation WRITE --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation READ --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation DESCRIBE --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation DESCRIBE_CONFIGS --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation READ --consumer-group '*'

As for working with the Kafka CLI clients you can follow along by downloading the Confluent Platform Community tools or just the standard Apache Kafka distribution.

I will use the standard CLI clients that come with the Apache Kafka open source binary distribution. If I were to down Apache Kafka 2.12-2.7.0 Scala binary distribution and extract it my local directory then producing some stock data would look as follows.

cd kafka_2.12-2.7.0/
./bin/kafka-console-producer.sh --bootstrap-server pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 --topic stocks \
  --producer.config /path/to/client-auth.properties \
  --property parse.key=true --property key.separator=:

Enter some random stock quote values for the companies ACME and HOOLI then CTRL+C to terminate the producer.

>ACME:{"symbol":"ACME","quote":101.01}
>HOOLI:{"symbol":"HOOLI","quote":54.28}
>ACME:{"symbol":"ACME","quote":101.99}

Now I can authenticate with the CLI consumer and consume the data that was produced to the Confluent Cloud Kafka Cluster.

./bin/kafka-console-consumer.sh --bootstrap-server pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 --topic stocks \
   --consumer.config /path/to/client-auth.properties \
   --property print.key=true --from-beginning

With output (CTRL+C to exit).

ACME	{"symbol":"ACME","quote":101.01}
HOOLI	{"symbol":"HOOLI","quote":54.28}
ACME	{"symbol":"ACME","quote":101.99}

This simple dummy data I just produced with the kafka-console-producer.sh program doesn't quite fit the data schema of the stocks data to be used with the Confluent Datagen connect later on so, I'll clear it out by deleting and recreating the topic.

Delete it.

ccloud kafka topic delete stocks --cluster lkc-x88n1

Recreate it.

ccloud kafka topic create stocks --partitions 3 --cluster lkc-x88n1

Enable the Confluent Schema Registry in Confluent Cloud

Its actually really important to enable Schema Registry before Launching any ksqlDB apps. If you do this the other way around previously launched ksqlDB apps will not be able to recognize the Schema Registry

Its generally a good idea to utilize the Schema Registry to provide both flexibility as well as validation of your data representations. Enabling the Schema Registry is as simple as specifying the AWS Cloud provider you want it to run in (AWS for this tutorial) as well as the Geographic region which I'll specify as US to go along with the us-east-2 region I placed the Kakfa Cluster in.

ccloud schema-registry cluster enable --cloud aws --geo us

This outputs the URL for the REST API of the Schema Registry.

+--------------+--------------------------------------------------+
| Id           | lsrc-rxmgp                                       |
| Endpoint URL | https://psrc-vrpp5.us-east-2.aws.confluent.cloud |
+--------------+--------------------------------------------------+

Then a separate API Key / Secret pair is needed for the Confluent Schema Registry which again can be created using the ccloud CLI.

ccloud api-key create --resource lsrc-rxmgp

Output.

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | BSILNNZVYIVEEJFH                                                 |
| Secret  | V1G4KgnPQkRL48hjNrVisHfbgqOByIhLVPbYZmVrSDatY1AK0l73V+yFdjrsPl5F |
+---------+------------------------------------------------------------------+

Create ksqlDB Application using ccloud CLI

After waiting a few minutes I create a ksqlDB application named stocks-ksql.

ccloud ksql app create stocks-ksql --cluster lkc-x88n1 \
  --api-key AOGURDQHU4H4643L \
  --api-secret KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj

Then output should be similar to the following.

+--------------+--------------------------------------------------------+
| Id           | lksqlc-311jm                                           |
| Name         | stocks-ksql                                            |
| Topic Prefix | pksqlc-oqz99                                           |
| Kafka        | lkc-x88n1                                              |
| Storage      |                                                    500 |
| Endpoint     | https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443 |
| Status       | PROVISIONING                                           |
+--------------+--------------------------------------------------------+

Something that is not quite immediately obvious about the above command is that a service account is created for the ksql application which you can see by listing the service accounts like so.

ccloud service-account list

Giving output similar to this.

    Id   | Resource ID |       Name        |          Description
+--------+-------------+-------------------+--------------------------------+
  244411 | sa-l69ro2   | KSQL.lksqlc-311jm | SA for KSQL w/ ID lksqlc-311jm
         |             |                   | and Name stocks-ksql

Service accounts are generally assoicated with a particular application such as a Java or Python client program producing messages to Kafak or consuming messages from Kafka, a Kafka Streams application and, of course, a ksqlDB application.

Launch Datagen Connector in Confluent Cloud for Stock Data Generation

In an earlier post How to Use Kafka Connect Datagen with Docker and Confluent Schema Registry I demonstrated how to use the Kafka Connect Datagen Connector with a local Docker provisioned Confluent Platform environment. This section will be very similar but, instead of running the connector plugin locally I deploy it to Confluent Cloud.

The DatagenSource connector available in Confluent Cloud doesn't support custom schema definition (as of the time of me writing this) so you must select from one of the predefined quickstart templates. I will be using the STOCK_TRADES quick start template which you can find the schema definition on the GitHub Kafka Connect Datagen Repository.

Confluent Cloud comes with a number of standard Confluent certified connectors which you can list with the below command and, of course, the Confluent DatagenSource is among them.

ccloud connector-catalog list

Output.

         PluginName        |  Type
+--------------------------+--------+
  MicrosoftSqlServerSink   | sink
  MySqlCdcSource           | source
  PubSubSource             | source
  MongoDbAtlasSink         | sink
  MySqlSource              | source
  AzureEventHubsSource     | source
  KinesisSource            | source
  ElasticsearchSink        | sink
  S3_SINK                  | sink
  PostgresCdcSource        | source
  SalesforceCdcSource      | source
  PostgresSource           | source
  OracleDatabaseSource     | source
  MicrosoftSqlServerSource | source
  RedshiftSink             | sink
  SnowflakeSink            | sink
  DatagenSource            | source
  MongoDbAtlasSource       | source
  LambdaSink               | sink
  SqlServerCdcSource       | source
  MySqlSink                | sink
  PostgresSink             | sink

For details on how to configure the Datagen Source connector you can describe it using the ccloud CLI or browse the docs on Confluent.

ccloud connector-catalog describe DatagenSource

Output.

Following are the required configs:
connector.class: DatagenSource
name : ["name" is required]
kafka.api.key : ["kafka.api.key" is required]
kafka.api.secret : ["kafka.api.secret" is required]
kafka.topic : [Missing required configuration "kafka.topic" which has no default value. "kafka.topic" is required]
output.data.format : ["output.data.format" is required Value "null" doesn't belong to the property's "output.data.format" enum Value "null" is not a valid "Output message format" type]
quickstart : ["quickstart" is required Value "null" is not a valid "Quickstart" type]
tasks.max : ["tasks.max" is required]

Armed with the information on the configuration properties I create a connector-config.json file with the following specifications to generate stock data using the Datagen Connector.

{
  "name": "stocks-datagen",
  "connector.class": "DatagenSource",
  "kafka.api.key": "AOGURDQHU4H4643L",
  "kafka.api.secret": "KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj",
  "kafka.topic": "stocks",
  "output.data.format": "AVRO",
  "quickstart": "STOCK_TRADES",
  "max.interval":"1000",
  "tasks.max": "1" 
}

To create the connector I have a few options:

1) I can create it in the Confluent Cloud UI which is a nice UI

2) I can use the ccloud CLI to create the connector using the connector-config.json file just created as demonstrated below.

ccloud connector create --config connector-config.json --cluster lkc-x88n1

Then after creating you can view the status of it with the following.

ccloud connector list

Output.

     ID     |      Name      | Status  |  Type  | Trace
+-----------+----------------+---------+--------+-------+
  lcc-rxm51 | stocks-datagen | RUNNING | source |

3) I can use the connect REST API endpoint with a standard HTTP Client like HTTPie.

In order to use the REST API I need a cloud resource scoped key/secret pair which are created as shown below.

ccloud api-key create --resource cloud

Output.

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | 3LTWD2R2LXHKWHUQ                                                 |
| Secret  | LZ6jqQKYuTZZreBp2ltcnE2wjHnmvAZHKR9j2Z9hs1oSHxMAfwDeIpRs8QMUJQrl |
+---------+------------------------------------------------------------------+

Then use the environment and cluster IDs along with the newly created cloud api key set to construct a PUT HTTP request as shown below.

ENV_ID=env-g526n
CLUSTER_ID=lkc-x88n1
CONNECTOR_NAME=stocks-datagen
REST_URL="https://api.confluent.cloud/connect/v1/environments/$ENV_ID/clusters/$CLUSTER_ID/connectors/$CONNECTOR_NAME/config"
AUTH="3LTWD2R2LXHKWHUQ:LZ6jqQKYuTZZreBp2ltcnE2wjHnmvAZHKR9j2Z9hs1oSHxMAfwDeIpRs8QMUJQrl"
http PUT $REST_URL @connector-config.json --auth $AUTH

Output.

{
    "config": {
        "cloud.environment": "prod",
        "cloud.provider": "aws",
        "connector.class": "DatagenSource",
        "internal.kafka.endpoint": "PLAINTEXT://kafka-0.kafka.pkc-ep9mm.svc.cluster.local:9071,kafka-1.kafka.pkc-ep9mm.svc.cluster.local:9071,kafka-2.kafka.pkc-ep9mm.svc.cluster.local:9071",
        "kafka.api.key": "****************",
        "kafka.api.secret": "****************",
        "kafka.dedicated": "false",
        "kafka.endpoint": "SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092",
        "kafka.region": "us-east-2",
        "kafka.topic": "stocks",
        "max.interval": "1000",
        "name": "stocks-datagen",
        "output.data.format": "AVRO",
        "quickstart": "STOCK_TRADES",
        "tasks.max": "1",
        "valid.kafka.api.key": "true"
    },
    "name": "stocks-datagen",
    "tasks": [],
    "type": "source"
}

Similar to the ccloud CLI usage you can use the REST API to view the status of the connector.

REST_URL="https://api.confluent.cloud/connect/v1/environments/$ENV_ID/clusters/$CLUSTER_ID/connectors/$CONNECTOR_NAME/status"
http $REST_URL --auth $AUTH

Output.

{
    "connector": {
        "state": "RUNNING",
        "trace": "",
        "worker_id": "stocks-datagen"
    },
    "name": "stocks-datagen",
    "tasks": [
        {
            "id": 0,
            "msg": "",
            "state": "RUNNING",
            "worker_id": "stocks-datagen"
        }
    ],
    "type": "source"
}

Stream Processing with ksqlDB

In this section I use ksqlDB to find high and low stock prices as a function of percentages above and below average stock prices for each company generated from the DatagenSource connector. As new high or low stock prices are found I dump those into a new stream and subsequent topic which can then be consumed and react ed to them by sending an email alert.

First things first I create a new API key for the ksqlDB application using the service account that was created automatically when the ksqlDB app was created. I can easily find that using the following ccloud command.

ccloud service-account list

Output below shows the service account has ID 244411.

    Id   | Resource ID |       Name        |          Description
+--------+-------------+-------------------+--------------------------------+
  244411 | sa-l69ro2   | KSQL.lksqlc-311jm | SA for KSQL w/ ID lksqlc-311jm
         |             |                   | and Name stocks-ksql
  244423 | sa-lg0ow1   | stocks-cluster-sa | For client applications

I also need to have the ksqlDB application resource ID so let me use ccloud to find that as well.

ccloud ksql app list

Output below shows the ksqlDB app ID of lksqlc-311jm

       Id      |    Name     | Topic Prefix |   Kafka   | Storage |                        Endpoint                        | Status
+--------------+-------------+--------------+-----------+---------+--------------------------------------------------------+--------+
  lksqlc-311jm | stocks-ksql | pksqlc-oqz99 | lkc-x88n1 |     500 | https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443 | UP

Then I create the ksqlDB specific API key/secret pair for the service account.

ccloud api-key create --service-account 244411 --resource lksqlc-311jm

Output which should be saved to a file somewhere safe because the secret cannot be retrieved again.

It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | SGZBQVP6SMSFSFNG                                                 |
| Secret  | 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr |
+---------+------------------------------------------------------------------+

I now have the credentials established and can begin using the ksql CLI to construct and execute the queries needed for stream processing.

If you have downloaded and installed the Confluent Platform Community tools you can use the ksql CLI directly.

ksql -u SGZBQVP6SMSFSFNG \
  -p 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr \
  https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443

Or if you prefer Docker you can use the ksqldb-cli Docker image provided by Confluent like so.

docker run --rm -it confluentinc/ksqldb-cli:0.18.0 ksql -u SGZBQVP6SMSFSFNG \
  -p 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr \
  https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443

From within the ksql CLI shell you can list topics.

ksql> show topics;

Output.

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 pksqlc-oqz99-processing-log | 8          | 3
 stocks                      | 3          | 3
---------------------------------------------------------------

You can also further inspect topic data with the PRINT statement which is simply an interface to a Kafka consumer within the ksqldb-cli.

ksql> print 'stocks' limit 5;

Output.

Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/06/04 20:02:49.104 Z, key: ZVV, value: {"side": "BUY", "quantity": 1725, "symbol": "ZVV", "price": 515, "account": "LMN456", "userid": "User_2"}, partition: 0
rowtime: 2021/06/04 20:02:49.878 Z, key: ZVV, value: {"side": "BUY", "quantity": 517, "symbol": "ZVV", "price": 874, "account": "LMN456", "userid": "User_7"}, partition: 0
rowtime: 2021/06/04 20:02:51.043 Z, key: ZWZZT, value: {"side": "SELL", "quantity": 3541, "symbol": "ZWZZT", "price": 221, "account": "LMN456", "userid": "User_7"}, partition: 0
rowtime: 2021/06/04 20:02:51.684 Z, key: ZWZZT, value: {"side": "BUY", "quantity": 3744, "symbol": "ZWZZT", "price": 475, "account": "XYZ789", "userid": "User_5"}, partition: 0
rowtime: 2021/06/04 20:02:52.586 Z, key: ZWZZT, value: {"side": "BUY", "quantity": 505, "symbol": "ZWZZT", "price": 570, "account": "XYZ789", "userid": "User_6"}, partition: 0
Topic printing ceased

The first thing I need to do to start developing queries for processing the stock data is to create a stream over the stocks topic.

CREATE STREAM STOCKS_STREAM WITH (
  KAFKA_TOPIC='stocks',
  VALUE_FORMAT='AVRO'
);

Ok with the STOCKS_STREAM stream created I can create a table to serve as a materialized view which will calculate an arbitrary high price thresh defined as the average stock price plus 90% of this value along with a low price threshold of the average stock price minus 90% for each company symbol.

CREATE TABLE STOCKS_HIGHLOW_TABLE
WITH (KAFKA_TOPIC='stocks_highlow_tbl') AS 
SELECT
  SYMBOL,
  AVG(PRICE) * 1.9 HIGH_THRESHOLD,
  AVG(PRICE) * 0.1 LOW_THRESHOLD
FROM STOCKS_STREAM
GROUP BY SYMBOL EMIT CHANGES;

Next I create a new stream that joins the original STOCKS_STREAM stream to the STOCKS_HIGHLOW_TABLE table and filter down just stock instances that are greater than the HIGH_THRESHOLD value or lower than the LOW_THRESHOLD value.

CREATE STREAM STOCK_THRESHOLDS_STREAM 
WITH (KAFKA_TOPIC='stocks_thresholds_chg') AS
SELECT 
  s.SYMBOL AS SYMBOL,
  s.PRICE AS PRICE,
  t.HIGH_THRESHOLD AS HIGH_THRESHOLD,
  t.LOW_THRESHOLD AS LOW_THRESHOLD,
  CASE
    WHEN s.PRICE > t.HIGH_THRESHOLD THEN 'HIGH PRICE'
    WHEN s.PRICE < t.LOW_THRESHOLD THEN 'LOW PRICE'
  END AS PRICE_DESCRIPTION
FROM STOCKS_STREAM s
  JOIN STOCKS_HIGHLOW_TABLE t ON s.SYMBOL = t.SYMBOL
WHERE s.PRICE > t.HIGH_THRESHOLD OR s.PRICE < t.LOW_THRESHOLD
EMIT CHANGES;

This leaves a stream of stocks being populated and filtered down to only those which have a low or high price crossing the thresholds calculated in the ksqlDB table and being persisted as the Kafka topic named stocks_thresholds_chg. I can utilize a Kafka Client application to consume from this stocks_thresholds_chg topic and any new events it receives are meaningful in that they represent significantly low or high stock prices.

This is in effect the combination of real-time analytics via the threshold calculation being performed in ksqlDB along with the makings for an event driven architecture whereby one or more decoupled applications can consume these events and do something meaningful in response to them.

I can inspect the stocks_thresholds_chg topic with the ksql CLI print statement.

print 'stocks_thresholds_chg' limit 5;

Output.

Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/06/04 21:45:18.158 Z, key: ZVV, value: {"PRICE": 16, "HIGH_THRESHOLD": 963.0579761904761, "LOW_THRESHOLD": 50.687261904761904, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 0
rowtime: 2021/06/04 21:45:21.812 Z, key: ZXZZT, value: {"PRICE": 955, "HIGH_THRESHOLD": 947.6771122320303, "LOW_THRESHOLD": 49.87774274905423, "PRICE_DESCRIPTION": "HIGH PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:28.679 Z, key: ZXZZT, value: {"PRICE": 954, "HIGH_THRESHOLD": 947.6771122320303, "LOW_THRESHOLD": 49.87774274905423, "PRICE_DESCRIPTION": "HIGH PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:37.117 Z, key: ZTEST, value: {"PRICE": 22, "HIGH_THRESHOLD": 967.7418137553256, "LOW_THRESHOLD": 50.93377967133293, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:46.540 Z, key: ZTEST, value: {"PRICE": 36, "HIGH_THRESHOLD": 967.7418137553256, "LOW_THRESHOLD": 50.93377967133293, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 2
Topic printing ceased

Provisioning a Serverless Consumer Application in AWS Cloud

At this point I have established the data flow into Kafka using Confluent Cloud services within the AWS Cloud such as Kafka Connect Datagen Source Connector to generate fake stock data and wrote stream processing logic using ksqlDB to determine meaningful low and high stock prices. Now I progress onto implementing a flexible serverless consumer application deployed to the AWS Cloud using my own AWS Cloud Account which is separate from the AWS Cloud account running the Confluent Cloud Account services.

These days I've grown quite fond of the AWS Serverless Application Model (SAM) framework for rapidly developing and deploying Lambda functions with countless integrations. If you choose to follow along with this tutorial please use the following AWS Docs to install the AWS SAM CLI to install and configure the CLI.

First step is to initialize a new SAM project for a Python based application like so.

sam init --name sam-kafka-sink-consumer \
   --runtime python3.8 \
   --package-type Zip \
   --app-template hello-world

This will generate a bunch of starter files, most of which I will actually delete, within a directory named sam-kafka-sink-consumer. I will delete the following directories and rename the hello_world directory to sink_consumer which will contain the source code for the consumer application.

cd sam-kafka-sink-consumer
rm -r events
rm -r tests
mv hello_world sink_consumer

Which leaves me with the following directory structure.

.
├── README.md
├── __init__.py
├── sink_consumer
│   ├── __init__.py
│   ├── app.py
│   └── requirements.txt
└── template.yaml

Next I replace the contents of the template.yaml file with the following AWS resource definitions.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-kafka-sink-consumer

Parameters:
  StocksSnsTopicName:
    Type: String
    Default: high-low-stocks
  SnsSubscriptionEmail:
    Type: String

Resources:
  ConfluentSinkUser:
    Type: AWS::IAM::User
    Properties:
      UserName: ConfluentSinkUser
      Policies: 
        - PolicyDocument: {
            "Version": "2012-10-17",
            "Statement": [{
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunction",
                "lambda:InvokeAsync"],
            "Resource": "*"
            }]
          }
          PolicyName: ConfluentSinkUserPolicy
  ConfluentSinkUserAccessKey:
    Type: AWS::IAM::AccessKey
    Properties:
      UserName: !Ref ConfluentSinkUser

  StockAlertsSnsTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: !Ref StocksSnsTopicName
      Subscription:
        - Endpoint: !Ref SnsSubscriptionEmail
          Protocol: email

  SinkConsumerFunction:
    Type: AWS::Serverless::Function
    DependsOn: StockAlertsSnsTopic
    Properties:
      CodeUri: sink_consumer/
      Handler: app.lambda_handler
      Runtime: python3.8
      Timeout: 30
      MemorySize: 512
      Environment:
        Variables:
          SNS_TOPIC: !Ref StockAlertsSnsTopic
      Policies:
        - SNSPublishMessagePolicy:
            TopicName: !GetAtt StockAlertsSnsTopic.TopicName

  SinkConsumerFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName:
        Fn::Join:
          - ''
          - - /aws/lambda/
            - Ref: SinkConsumerFunction
      RetentionInDays: 3

Outputs:
  StockAlertsSnsTopic:
    Value: !Ref StockAlertsSnsTopic
  ConfluentSinkUserAccessKey:
    Value: !Ref ConfluentSinkUserAccessKey
  ConfluentSinkUserSecret:
    Value: !GetAtt ConfluentSinkUserAccessKey.SecretAccessKey
  SinkConsumerFunction:
    Value: !Ref SinkConsumerFunction

The resources being created in this SAM / CloudFormation template include an IAM User and Access Key/Secret pair which are to be used by the AWS Lambda Sink Connector in Confluent Cloud, an SNS topic for sending Email alerts to notify of new stock high / low values and, an AWS Lambda function with Python 3.8 runtime to accept the Kafka events and send them to the SNS topic for email notifications.

Next I add the Python source code for consuming the events into the app.py file as shown below.

import base64
import json
import logging
import os

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(events, context):
    logger.info({'event': events})

    # use only the most recent stock update if 
    # multiple arrive in same batch
    latest_prices = {}
    for evt in events:
        value = evt['payload']['value']
        key = base64.b64decode(evt['payload']['key']).decode('utf-8')
        value.update(SYMBOL=key)
        value.update(timestamp=evt['payload']['timestamp'])

        existing_price = latest_prices.get(key)
        if not existing_price or existing_price['timestamp'] < value['timestamp']:
            latest_prices[key] = value

    logger.info({'latest_prices': latest_prices})

    # assemble message for email notification
    messages = []
    for stock_highlow in latest_prices.values():
        symbol = stock_highlow['SYMBOL']
        desc = stock_highlow['PRICE_DESCRIPTION']
        price = stock_highlow['PRICE']
        low_threshold = stock_highlow['LOW_THRESHOLD']
        high_threshold = stock_highlow['HIGH_THRESHOLD']
        messages.append(f"{symbol} has new {desc} of ${price} with thresholds ({low_threshold} - {high_threshold})")
    
    logger.info({'messages': messages})

    # send email via SNS topic
    sns = boto3.client('sns')
    msg = '\n'.join(messages)
    try:
        sns.publish(
            TopicArn=os.environ['SNS_TOPIC'],
            Message=msg,
            Subject='Lambda Stocks Stream Alert'
        )
    except Exception as e:
        logger.error({
            'error': 'failed_sns_publish',
            'exception': str(e),
            'topic': os.environ['SNS_TOPIC'],
            'message': msg
        })
        raise e

Then in the same directory as the template.yaml file I build the project.

sam build --use-container

Following that I deploy it with the following interactive SAM deploy command which will ask me for a valid email address to send the email alerts to then I an accpet the defaults for the remaining.

sam deploy --guided --stack-name sam-kafka-sink-consumer \
  --region us-east-2 --capabilities CAPABILITY_NAMED_IAM

After the command is finished the resources will be deployed to the AWS Cloud and their will be some output as shown below that is required for configuring the AWS Lambda Sink Connector in the Confluent Cloud.

CloudFormation outputs from deployed stack
----------------------------------------------------------------------------------------
Outputs                                                                                           
----------------------------------------------------------------------------------------
Key                 ConfluentSinkUserAccessKey                                                    
Description         -                                                                             
Value               YOUR-KEY                                                          

Key                 StockAlertsSnsTopic                                                           
Description         -                                                                             
Value               arn:aws:sns:us-east-2:YOUR-ACCOUNT-ID:high-low-stocks                            

Key                 ConfluentSinkUserSecret                                                       
Description         -                                                                             
Value               YOUR-SECRET                                      

Key                 SinkConsumerFunction                                                          
Description         -                                                                             
Value               YOUR-FUNCTION-NAME                    
----------------------------------------------------------------------------------------

Your values for YOUR-KEY YOUR-SECRET and YOUR-FUNCTION-NAME will be different obviously since these are placeholders I've filled in.

Launch AWS Lambda Sink Connector in Confluent Cloud

The last peice to this puzzle is to configure the AWS Lambda Sink Connector in Confluent Cloud to pull data from the stocks_thresholds_chg topic and feed it to the AWS Lambda function created in the last step.

A good place to start figuring out what information is needed to launch the connector is the ccloud connector-catalog describe command.

ccloud connector-catalog describe LambdaSink

Output.

Following are the required configs:
connector.class: LambdaSink
input.data.format : ["input.data.format" is required Value "null" doesn't belong to the property's "input.data.format" enum Value "null" is not a valid "Input message format" type]
name : ["name" is required]
kafka.api.key : ["kafka.api.key" is required]
kafka.api.secret : ["kafka.api.secret" is required]
aws.access.key.id : ["aws.access.key.id" is required]
aws.secret.access.key : ["aws.secret.access.key" is required]
aws.lambda.function.name : ["aws.lambda.function.name" is required]
tasks.max : ["tasks.max" is required]
topics : [org.apache.kafka.common.config.ConfigException: Must configure one of topics or topics.regex "topics" is required]

However, for more information its a good idea to have a look at the official Lambda Sink Connector docs on Confluent.

I place the following configs in a file named lambda-connector-config.json as shown below.

{
   "name": "stocks-highlow-lambda-sink",
	"connector.class": "LambdaSink",
	"input.data.format": "AVRO",
   "kafka.api.key": "AOGURDQHU4H4643L",
   "kafka.api.secret": "KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj",
	"aws.access.key.id": "YOUR-KEY",
	"aws.secret.access.key": "YOUR-SECRET",
	"aws.lambda.function.name": "YOUR-FUNCTION-NAME",
	"aws.lambda.invocation.type": "async",
	"tasks.max": "1",
	"topics": "stocks_thresholds_chg"
}

Then I use the ccloud CLI to create the Connector inside Confluent Cloud.

ccloud connector create --config lambda-connector-config.json --cluster lkc-x88n1

Verify the status of the connector to make sure it progresses past the PROVISIONING state and into the RUNNING state.

ccloud connector list

Output.

     ID     |            Name            | Status  |  Type  | Trace
+-----------+----------------------------+---------+--------+-------+
  lcc-ppmxm | stocks-highlow-lambda-sink | RUNNING | sink   |
  lcc-v2m6z | stocks-datagen             | RUNNING | source |

Once the connector reaches the running state I can check the AWS SAM logs to verify that the Lambda Sink Conumer function is being invoked.

To tail the logs run the following command in the same directory as the SAM template.yaml

sam logs --stack-name sam-kafka-sink-consumer --name SinkConsumerFunction --tail

I also start seeing email notifications informating me of new stock quotes.

Cleanup

At this point my tutorial is over and I can tear down the resources I've provisioned in my AWS Cloud Account as well as my Confluent Cloud Account.

To start I'll delete the SAM Application resources by nuking the underlying CloudFormation stack.

aws cloudformation delete-stack --stack-name sam-kafka-sink-consume --region us-east-2

Then I can remove the Confluent Cloud resources starting with the connectors.

ccloud connector delete lcc-ppmxm
ccloud connector delete lcc-v2m6z

Then I'll delete the ksqlDB app.

ccloud ksql app delete lksqlc-311jm

After that I delete the Kafka Cluster.

ccloud kafka cluster delete lkc-x88n1

Follow that up by deleting the serverless-stocks-demo environment.

ccloud environment delete env-g526n

And last but not least the service account I created for Kafka Clients can be deleted.

ccloud service-account delete 244423

At this point I have purged all the resources I created for this tutorial :)

Resources for Learning More

thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com

Conclusion

In this article I have presented a demonstration of how one can build an end-to-end fully Cloud Native Serverless streaming data pipeline for both real-time analytical computation as well as event driven architecture using Confluent Cloud and Amazon Web Services Cloud.

As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Community favorites for Data Engineering

theCodingInterface