Here I present an end-to-end example of a Serverless event driven architecture using Confluent Cloud for stream processing paired with AWS Lambda for event responsive logic using the Serverless Application Model (SAM) framework. Together this architecture will compose a system for fictitious financial stock quote email alerting. The use case being demonstrated assumes that you already have an active Confluent Cloud Account as well as an AWS Account. Once the two accounts are ready to go the process can be described stepwise as folows.
1. Download and install the Confluent Cloud ccloud CLI
2. Create Kafka Cluster and User API Key with ccloud CLI
3. Creating a Kafka Topic for the Stock Data
4. Create Service Account and API Keys for Kafka Clients
5. Enable the Confluent Schema Registry in Confluent Cloud
6. Create ksqlDB Application using ccloud CLI
7. Launch Datagen Connector in Confluent Cloud for Stock Data Generation
8. Stream Processing with ksqlDB
9. Provisioning a Serverless Consumer Application in AWS Cloud
10. Launch AWS Lambda Sink Connector in Confluent Cloud
The flow of data among the components and environments of this system is depicted in the diagram below.
All Cloud Service configuration and provisioning for this tutorial will be done through interactive CLI commands and Infrastructure as Code (IaC). I've found this is important for two reasons. First, cloud based service providers like AWS and Confluent have a history of continuously updating their UIs and thus screenshots of specific screens and settings often become inaccurate too quickly to be useful. Second, in practice it is much more realistic and valuable to use automated methods such as CLI based commands or IaC over clicking around in a UI.
Excited? Then lets get started!
Confluent already has fantastic ccloud CLI installation docs so please refer to them if you've not yet installed it.
However, I do have a word of caution. Confluent Cloud is a relatively new service offering and Confluent is rapidly evolving it. I cannot guarentee that the CLI commands used will remain stable / backwards compatible forever but, I am reasonably sure it will be more stable than the UI (even though I must admit its a pretty slick UI ... hat tip to the Confluent frontend dev team).
I've listed the version of the ccloud CLI I'm using below.
ccloud --version
Version output.
ccloud version v1.32.0
Do make sure you've logged in with ccloud using your desired Confluent Cloud account credentials (email and password)
ccloud login --prompt
Confluent Cloud comes with a default environment but, for this tutorial I will use a fresh one. This way I can isolate the Kafka Cluster, ksqlDB application, service accounts and associated api-keys to just this demo which I can then throw away once the demo is done.
Create an environment.
ccloud environment create serverless-stocks-demo
Output will look similar to this but with different environment ID.
+------------------+------------------------+
| Environment Name | serverless-stocks-demo |
| Id | env-g526n |
+------------------+------------------------+
Set the ccloud CLI to use the newly created environment.
ccloud environment use env-g526n
Now I can create a basic Kafka Cluster named stocks-cluster in the AWS cloud within the us-east-2 region as shown below.
ccloud kafka cluster create stocks-cluster --cloud aws --region us-east-2 --type basic
This produces output metadata about the new cluster.
It may take up to 5 minutes for the Kafka cluster to be ready.
+--------------+---------------------------------------------------------+
| Id | lkc-x88n1 |
| Name | stocks-cluster |
| Type | BASIC |
| Ingress | 100 |
| Egress | 100 |
| Storage | 5000 |
| Provider | aws |
| Availability | single-zone |
| Region | us-east-2 |
| Status | UP |
| Endpoint | SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 |
| ApiEndpoint | https://pkac-4vnm0.us-east-2.aws.confluent.cloud |
| RestEndpoint | |
+--------------+---------------------------------------------------------+
Next I use the Cluster ID of the newly created cluster to configure the ccloud CLI to use it as the default cluster for subsequent commands.
ccloud kafka cluster use lkc-x88n1
I need to make a user API Key for adminstrative tasks like creating ksqlDB applications for the cluster along with creating and configuring topics in the Kafka cluster itself.
ccloud api-key create --resource lkc-x88n1
The output should be similar to this.
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | AOGURDQHU4H4643L |
| Secret | KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj |
+---------+------------------------------------------------------------------+
As the warning says, you will need to wait a couple of minutes before progressing as the API Keys take some time to provision. You should also store the secret somewhere safe (I'll be deleting mine along with this entire serverless-stocks-demo environment).
This will be a pretty simple step where I demonstrate how to create a topic named stocks that shall eventually receive stock data from the Confluent Datagen Connector.
ccloud kafka topic create stocks --partitions 3 --cluster lkc-x88n1
In this section I demonstrate how to create a service account for any regular ole Kafka clients to produce data to or consume data from Kafka topics. While this isn't exactly required for this example use case I have found it to be a helpful exercise in understanding how service accounts are used with Access Control Lists (ACLs) and how to configure Kafka clients to properly authenticate with Confluent Cloud.
The following creates a service account.
ccloud service-account create stocks-cluster-sa --description "For client applications"
Output.
+-------------+-------------------------+
| Id | 244423 |
| Resource ID | sa-lg0ow1 |
| Name | stocks-cluster-sa |
| Description | For client applications |
+-------------+-------------------------+
Next I create a key/secret pair associated with the service account to be used for authentication in client applications for the previously created Kafka cluster.
ccloud api-key create --service-account 244423 --resource lkc-x88n1
Output.
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | M5TTN4C3IFCXJ7BE |
| Secret | IlRwRJul7ds1JrE8lUiRYo+cXM2J7pxEqu/MsCf6ktETVTadmULrdrt35rAx4+YL |
+---------+------------------------------------------------------------------+
I then use the key/secret pair values along with the Kafka cluster endpoint to create a properties file for authenticating standard Kafka CLI Clients.
The Kafka Cluster endpoint can be retrieved like so.
ccloud kafka cluster describe lkc-x88n1
Output.
+--------------+---------------------------------------------------------+
| Id | lkc-x88n1 |
| Name | stocks-cluster |
| Type | BASIC |
| Ingress | 100 |
| Egress | 100 |
| Storage | 5000 |
| Provider | aws |
| Availability | single-zone |
| Region | us-east-2 |
| Status | UP |
| Endpoint | SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 |
| ApiEndpoint | https://pkac-4vnm0.us-east-2.aws.confluent.cloud |
| RestEndpoint | |
+--------------+---------------------------------------------------------+
Then using this information I create a client-auth.properties file as shown below.
bootstrap.servers=pkc-ep9mm.us-east-2.aws.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="M5TTN4C3IFCXJ7BE" password="IlRwRJul7ds1JrE8lUiRYo+cXM2J7pxEqu/MsCf6ktETVTadmULrdrt35rAx4+YL";
However, before I can use those credentials and associated service account to write data to the stocks topic or consume data from it I have to assign it a set of ACLs.
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation WRITE --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation READ --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation DESCRIBE --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation DESCRIBE_CONFIGS --topic 'stocks'
ccloud kafka acl create --allow --service-account 244423 --cluster lkc-x88n1 --operation READ --consumer-group '*'
As for working with the Kafka CLI clients you can follow along by downloading the Confluent Platform Community tools or just the standard Apache Kafka distribution.
I will use the standard CLI clients that come with the Apache Kafka open source binary distribution. If I were to down Apache Kafka 2.12-2.7.0 Scala binary distribution and extract it my local directory then producing some stock data would look as follows.
cd kafka_2.12-2.7.0/
./bin/kafka-console-producer.sh --bootstrap-server pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 --topic stocks \
--producer.config /path/to/client-auth.properties \
--property parse.key=true --property key.separator=:
Enter some random stock quote values for the companies ACME and HOOLI then CTRL+C to terminate the producer.
>ACME:{"symbol":"ACME","quote":101.01}
>HOOLI:{"symbol":"HOOLI","quote":54.28}
>ACME:{"symbol":"ACME","quote":101.99}
Now I can authenticate with the CLI consumer and consume the data that was produced to the Confluent Cloud Kafka Cluster.
./bin/kafka-console-consumer.sh --bootstrap-server pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 --topic stocks \
--consumer.config /path/to/client-auth.properties \
--property print.key=true --from-beginning
With output (CTRL+C to exit).
ACME {"symbol":"ACME","quote":101.01}
HOOLI {"symbol":"HOOLI","quote":54.28}
ACME {"symbol":"ACME","quote":101.99}
This simple dummy data I just produced with the kafka-console-producer.sh program doesn't quite fit the data schema of the stocks data to be used with the Confluent Datagen connect later on so, I'll clear it out by deleting and recreating the topic.
Delete it.
ccloud kafka topic delete stocks --cluster lkc-x88n1
Recreate it.
ccloud kafka topic create stocks --partitions 3 --cluster lkc-x88n1
Its actually really important to enable Schema Registry before Launching any ksqlDB apps. If you do this the other way around previously launched ksqlDB apps will not be able to recognize the Schema Registry
Its generally a good idea to utilize the Schema Registry to provide both flexibility as well as validation of your data representations. Enabling the Schema Registry is as simple as specifying the AWS Cloud provider you want it to run in (AWS for this tutorial) as well as the Geographic region which I'll specify as US to go along with the us-east-2 region I placed the Kakfa Cluster in.
ccloud schema-registry cluster enable --cloud aws --geo us
This outputs the URL for the REST API of the Schema Registry.
+--------------+--------------------------------------------------+
| Id | lsrc-rxmgp |
| Endpoint URL | https://psrc-vrpp5.us-east-2.aws.confluent.cloud |
+--------------+--------------------------------------------------+
Then a separate API Key / Secret pair is needed for the Confluent Schema Registry which again can be created using the ccloud CLI.
ccloud api-key create --resource lsrc-rxmgp
Output.
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | BSILNNZVYIVEEJFH |
| Secret | V1G4KgnPQkRL48hjNrVisHfbgqOByIhLVPbYZmVrSDatY1AK0l73V+yFdjrsPl5F |
+---------+------------------------------------------------------------------+
After waiting a few minutes I create a ksqlDB application named stocks-ksql.
ccloud ksql app create stocks-ksql --cluster lkc-x88n1 \
--api-key AOGURDQHU4H4643L \
--api-secret KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj
Then output should be similar to the following.
+--------------+--------------------------------------------------------+
| Id | lksqlc-311jm |
| Name | stocks-ksql |
| Topic Prefix | pksqlc-oqz99 |
| Kafka | lkc-x88n1 |
| Storage | 500 |
| Endpoint | https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443 |
| Status | PROVISIONING |
+--------------+--------------------------------------------------------+
Something that is not quite immediately obvious about the above command is that a service account is created for the ksql application which you can see by listing the service accounts like so.
ccloud service-account list
Giving output similar to this.
Id | Resource ID | Name | Description
+--------+-------------+-------------------+--------------------------------+
244411 | sa-l69ro2 | KSQL.lksqlc-311jm | SA for KSQL w/ ID lksqlc-311jm
| | | and Name stocks-ksql
Service accounts are generally assoicated with a particular application such as a Java or Python client program producing messages to Kafak or consuming messages from Kafka, a Kafka Streams application and, of course, a ksqlDB application.
In an earlier post How to Use Kafka Connect Datagen with Docker and Confluent Schema Registry I demonstrated how to use the Kafka Connect Datagen Connector with a local Docker provisioned Confluent Platform environment. This section will be very similar but, instead of running the connector plugin locally I deploy it to Confluent Cloud.
The DatagenSource connector available in Confluent Cloud doesn't support custom schema definition (as of the time of me writing this) so you must select from one of the predefined quickstart templates. I will be using the STOCK_TRADES quick start template which you can find the schema definition on the GitHub Kafka Connect Datagen Repository.
Confluent Cloud comes with a number of standard Confluent certified connectors which you can list with the below command and, of course, the Confluent DatagenSource is among them.
ccloud connector-catalog list
Output.
PluginName | Type
+--------------------------+--------+
MicrosoftSqlServerSink | sink
MySqlCdcSource | source
PubSubSource | source
MongoDbAtlasSink | sink
MySqlSource | source
AzureEventHubsSource | source
KinesisSource | source
ElasticsearchSink | sink
S3_SINK | sink
PostgresCdcSource | source
SalesforceCdcSource | source
PostgresSource | source
OracleDatabaseSource | source
MicrosoftSqlServerSource | source
RedshiftSink | sink
SnowflakeSink | sink
DatagenSource | source
MongoDbAtlasSource | source
LambdaSink | sink
SqlServerCdcSource | source
MySqlSink | sink
PostgresSink | sink
For details on how to configure the Datagen Source connector you can describe it using the ccloud CLI or browse the docs on Confluent.
ccloud connector-catalog describe DatagenSource
Output.
Following are the required configs:
connector.class: DatagenSource
name : ["name" is required]
kafka.api.key : ["kafka.api.key" is required]
kafka.api.secret : ["kafka.api.secret" is required]
kafka.topic : [Missing required configuration "kafka.topic" which has no default value. "kafka.topic" is required]
output.data.format : ["output.data.format" is required Value "null" doesn't belong to the property's "output.data.format" enum Value "null" is not a valid "Output message format" type]
quickstart : ["quickstart" is required Value "null" is not a valid "Quickstart" type]
tasks.max : ["tasks.max" is required]
Armed with the information on the configuration properties I create a connector-config.json file with the following specifications to generate stock data using the Datagen Connector.
{
"name": "stocks-datagen",
"connector.class": "DatagenSource",
"kafka.api.key": "AOGURDQHU4H4643L",
"kafka.api.secret": "KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj",
"kafka.topic": "stocks",
"output.data.format": "AVRO",
"quickstart": "STOCK_TRADES",
"max.interval":"1000",
"tasks.max": "1"
}
To create the connector I have a few options:
1) I can create it in the Confluent Cloud UI which is a nice UI
2) I can use the ccloud CLI to create the connector using the connector-config.json file just created as demonstrated below.
ccloud connector create --config connector-config.json --cluster lkc-x88n1
Then after creating you can view the status of it with the following.
ccloud connector list
Output.
ID | Name | Status | Type | Trace
+-----------+----------------+---------+--------+-------+
lcc-rxm51 | stocks-datagen | RUNNING | source |
3) I can use the connect REST API endpoint with a standard HTTP Client like HTTPie.
In order to use the REST API I need a cloud resource scoped key/secret pair which are created as shown below.
ccloud api-key create --resource cloud
Output.
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | 3LTWD2R2LXHKWHUQ |
| Secret | LZ6jqQKYuTZZreBp2ltcnE2wjHnmvAZHKR9j2Z9hs1oSHxMAfwDeIpRs8QMUJQrl |
+---------+------------------------------------------------------------------+
Then use the environment and cluster IDs along with the newly created cloud api key set to construct a PUT HTTP request as shown below.
ENV_ID=env-g526n
CLUSTER_ID=lkc-x88n1
CONNECTOR_NAME=stocks-datagen
REST_URL="https://api.confluent.cloud/connect/v1/environments/$ENV_ID/clusters/$CLUSTER_ID/connectors/$CONNECTOR_NAME/config"
AUTH="3LTWD2R2LXHKWHUQ:LZ6jqQKYuTZZreBp2ltcnE2wjHnmvAZHKR9j2Z9hs1oSHxMAfwDeIpRs8QMUJQrl"
http PUT $REST_URL @connector-config.json --auth $AUTH
Output.
{
"config": {
"cloud.environment": "prod",
"cloud.provider": "aws",
"connector.class": "DatagenSource",
"internal.kafka.endpoint": "PLAINTEXT://kafka-0.kafka.pkc-ep9mm.svc.cluster.local:9071,kafka-1.kafka.pkc-ep9mm.svc.cluster.local:9071,kafka-2.kafka.pkc-ep9mm.svc.cluster.local:9071",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"kafka.dedicated": "false",
"kafka.endpoint": "SASL_SSL://pkc-ep9mm.us-east-2.aws.confluent.cloud:9092",
"kafka.region": "us-east-2",
"kafka.topic": "stocks",
"max.interval": "1000",
"name": "stocks-datagen",
"output.data.format": "AVRO",
"quickstart": "STOCK_TRADES",
"tasks.max": "1",
"valid.kafka.api.key": "true"
},
"name": "stocks-datagen",
"tasks": [],
"type": "source"
}
Similar to the ccloud CLI usage you can use the REST API to view the status of the connector.
REST_URL="https://api.confluent.cloud/connect/v1/environments/$ENV_ID/clusters/$CLUSTER_ID/connectors/$CONNECTOR_NAME/status"
http $REST_URL --auth $AUTH
Output.
{
"connector": {
"state": "RUNNING",
"trace": "",
"worker_id": "stocks-datagen"
},
"name": "stocks-datagen",
"tasks": [
{
"id": 0,
"msg": "",
"state": "RUNNING",
"worker_id": "stocks-datagen"
}
],
"type": "source"
}
In this section I use ksqlDB to find high and low stock prices as a function of percentages above and below average stock prices for each company generated from the DatagenSource connector. As new high or low stock prices are found I dump those into a new stream and subsequent topic which can then be consumed and react ed to them by sending an email alert.
First things first I create a new API key for the ksqlDB application using the service account that was created automatically when the ksqlDB app was created. I can easily find that using the following ccloud command.
ccloud service-account list
Output below shows the service account has ID 244411.
Id | Resource ID | Name | Description
+--------+-------------+-------------------+--------------------------------+
244411 | sa-l69ro2 | KSQL.lksqlc-311jm | SA for KSQL w/ ID lksqlc-311jm
| | | and Name stocks-ksql
244423 | sa-lg0ow1 | stocks-cluster-sa | For client applications
I also need to have the ksqlDB application resource ID so let me use ccloud to find that as well.
ccloud ksql app list
Output below shows the ksqlDB app ID of lksqlc-311jm
Id | Name | Topic Prefix | Kafka | Storage | Endpoint | Status
+--------------+-------------+--------------+-----------+---------+--------------------------------------------------------+--------+
lksqlc-311jm | stocks-ksql | pksqlc-oqz99 | lkc-x88n1 | 500 | https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443 | UP
Then I create the ksqlDB specific API key/secret pair for the service account.
ccloud api-key create --service-account 244411 --resource lksqlc-311jm
Output which should be saved to a file somewhere safe because the secret cannot be retrieved again.
It may take a couple of minutes for the API key to be ready.
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | SGZBQVP6SMSFSFNG |
| Secret | 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr |
+---------+------------------------------------------------------------------+
I now have the credentials established and can begin using the ksql CLI to construct and execute the queries needed for stream processing.
If you have downloaded and installed the Confluent Platform Community tools you can use the ksql CLI directly.
ksql -u SGZBQVP6SMSFSFNG \
-p 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr \
https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443
Or if you prefer Docker you can use the ksqldb-cli Docker image provided by Confluent like so.
docker run --rm -it confluentinc/ksqldb-cli:0.18.0 ksql -u SGZBQVP6SMSFSFNG \
-p 12ivUbttwinXr3TMcUJL4vYFE4RCg1d1vaqri6Ztz6M7oJsNbLxd2sHbzk+bhPGr \
https://pksqlc-oqz99.us-east-2.aws.confluent.cloud:443
From within the ksql CLI shell you can list topics.
ksql> show topics;
Output.
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
pksqlc-oqz99-processing-log | 8 | 3
stocks | 3 | 3
---------------------------------------------------------------
You can also further inspect topic data with the PRINT statement which is simply an interface to a Kafka consumer within the ksqldb-cli.
ksql> print 'stocks' limit 5;
Output.
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/06/04 20:02:49.104 Z, key: ZVV, value: {"side": "BUY", "quantity": 1725, "symbol": "ZVV", "price": 515, "account": "LMN456", "userid": "User_2"}, partition: 0
rowtime: 2021/06/04 20:02:49.878 Z, key: ZVV, value: {"side": "BUY", "quantity": 517, "symbol": "ZVV", "price": 874, "account": "LMN456", "userid": "User_7"}, partition: 0
rowtime: 2021/06/04 20:02:51.043 Z, key: ZWZZT, value: {"side": "SELL", "quantity": 3541, "symbol": "ZWZZT", "price": 221, "account": "LMN456", "userid": "User_7"}, partition: 0
rowtime: 2021/06/04 20:02:51.684 Z, key: ZWZZT, value: {"side": "BUY", "quantity": 3744, "symbol": "ZWZZT", "price": 475, "account": "XYZ789", "userid": "User_5"}, partition: 0
rowtime: 2021/06/04 20:02:52.586 Z, key: ZWZZT, value: {"side": "BUY", "quantity": 505, "symbol": "ZWZZT", "price": 570, "account": "XYZ789", "userid": "User_6"}, partition: 0
Topic printing ceased
The first thing I need to do to start developing queries for processing the stock data is to create a stream over the stocks topic.
CREATE STREAM STOCKS_STREAM WITH (
KAFKA_TOPIC='stocks',
VALUE_FORMAT='AVRO'
);
Ok with the STOCKS_STREAM stream created I can create a table to serve as a materialized view which will calculate an arbitrary high price thresh defined as the average stock price plus 90% of this value along with a low price threshold of the average stock price minus 90% for each company symbol.
CREATE TABLE STOCKS_HIGHLOW_TABLE
WITH (KAFKA_TOPIC='stocks_highlow_tbl') AS
SELECT
SYMBOL,
AVG(PRICE) * 1.9 HIGH_THRESHOLD,
AVG(PRICE) * 0.1 LOW_THRESHOLD
FROM STOCKS_STREAM
GROUP BY SYMBOL EMIT CHANGES;
Next I create a new stream that joins the original STOCKS_STREAM stream to the STOCKS_HIGHLOW_TABLE table and filter down just stock instances that are greater than the HIGH_THRESHOLD value or lower than the LOW_THRESHOLD value.
CREATE STREAM STOCK_THRESHOLDS_STREAM
WITH (KAFKA_TOPIC='stocks_thresholds_chg') AS
SELECT
s.SYMBOL AS SYMBOL,
s.PRICE AS PRICE,
t.HIGH_THRESHOLD AS HIGH_THRESHOLD,
t.LOW_THRESHOLD AS LOW_THRESHOLD,
CASE
WHEN s.PRICE > t.HIGH_THRESHOLD THEN 'HIGH PRICE'
WHEN s.PRICE < t.LOW_THRESHOLD THEN 'LOW PRICE'
END AS PRICE_DESCRIPTION
FROM STOCKS_STREAM s
JOIN STOCKS_HIGHLOW_TABLE t ON s.SYMBOL = t.SYMBOL
WHERE s.PRICE > t.HIGH_THRESHOLD OR s.PRICE < t.LOW_THRESHOLD
EMIT CHANGES;
This leaves a stream of stocks being populated and filtered down to only those which have a low or high price crossing the thresholds calculated in the ksqlDB table and being persisted as the Kafka topic named stocks_thresholds_chg. I can utilize a Kafka Client application to consume from this stocks_thresholds_chg topic and any new events it receives are meaningful in that they represent significantly low or high stock prices.
This is in effect the combination of real-time analytics via the threshold calculation being performed in ksqlDB along with the makings for an event driven architecture whereby one or more decoupled applications can consume these events and do something meaningful in response to them.
I can inspect the stocks_thresholds_chg topic with the ksql CLI print statement.
print 'stocks_thresholds_chg' limit 5;
Output.
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/06/04 21:45:18.158 Z, key: ZVV, value: {"PRICE": 16, "HIGH_THRESHOLD": 963.0579761904761, "LOW_THRESHOLD": 50.687261904761904, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 0
rowtime: 2021/06/04 21:45:21.812 Z, key: ZXZZT, value: {"PRICE": 955, "HIGH_THRESHOLD": 947.6771122320303, "LOW_THRESHOLD": 49.87774274905423, "PRICE_DESCRIPTION": "HIGH PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:28.679 Z, key: ZXZZT, value: {"PRICE": 954, "HIGH_THRESHOLD": 947.6771122320303, "LOW_THRESHOLD": 49.87774274905423, "PRICE_DESCRIPTION": "HIGH PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:37.117 Z, key: ZTEST, value: {"PRICE": 22, "HIGH_THRESHOLD": 967.7418137553256, "LOW_THRESHOLD": 50.93377967133293, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 2
rowtime: 2021/06/04 21:45:46.540 Z, key: ZTEST, value: {"PRICE": 36, "HIGH_THRESHOLD": 967.7418137553256, "LOW_THRESHOLD": 50.93377967133293, "PRICE_DESCRIPTION": "LOW PRICE"}, partition: 2
Topic printing ceased
At this point I have established the data flow into Kafka using Confluent Cloud services within the AWS Cloud such as Kafka Connect Datagen Source Connector to generate fake stock data and wrote stream processing logic using ksqlDB to determine meaningful low and high stock prices. Now I progress onto implementing a flexible serverless consumer application deployed to the AWS Cloud using my own AWS Cloud Account which is separate from the AWS Cloud account running the Confluent Cloud Account services.
These days I've grown quite fond of the AWS Serverless Application Model (SAM) framework for rapidly developing and deploying Lambda functions with countless integrations. If you choose to follow along with this tutorial please use the following AWS Docs to install the AWS SAM CLI to install and configure the CLI.
First step is to initialize a new SAM project for a Python based application like so.
sam init --name sam-kafka-sink-consumer \
--runtime python3.8 \
--package-type Zip \
--app-template hello-world
This will generate a bunch of starter files, most of which I will actually delete, within a directory named sam-kafka-sink-consumer. I will delete the following directories and rename the hello_world directory to sink_consumer which will contain the source code for the consumer application.
cd sam-kafka-sink-consumer
rm -r events
rm -r tests
mv hello_world sink_consumer
Which leaves me with the following directory structure.
.
├── README.md
├── __init__.py
├── sink_consumer
│ ├── __init__.py
│ ├── app.py
│ └── requirements.txt
└── template.yaml
Next I replace the contents of the template.yaml file with the following AWS resource definitions.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-kafka-sink-consumer
Parameters:
StocksSnsTopicName:
Type: String
Default: high-low-stocks
SnsSubscriptionEmail:
Type: String
Resources:
ConfluentSinkUser:
Type: AWS::IAM::User
Properties:
UserName: ConfluentSinkUser
Policies:
- PolicyDocument: {
"Version": "2012-10-17",
"Statement": [{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunction",
"lambda:InvokeAsync"],
"Resource": "*"
}]
}
PolicyName: ConfluentSinkUserPolicy
ConfluentSinkUserAccessKey:
Type: AWS::IAM::AccessKey
Properties:
UserName: !Ref ConfluentSinkUser
StockAlertsSnsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Ref StocksSnsTopicName
Subscription:
- Endpoint: !Ref SnsSubscriptionEmail
Protocol: email
SinkConsumerFunction:
Type: AWS::Serverless::Function
DependsOn: StockAlertsSnsTopic
Properties:
CodeUri: sink_consumer/
Handler: app.lambda_handler
Runtime: python3.8
Timeout: 30
MemorySize: 512
Environment:
Variables:
SNS_TOPIC: !Ref StockAlertsSnsTopic
Policies:
- SNSPublishMessagePolicy:
TopicName: !GetAtt StockAlertsSnsTopic.TopicName
SinkConsumerFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName:
Fn::Join:
- ''
- - /aws/lambda/
- Ref: SinkConsumerFunction
RetentionInDays: 3
Outputs:
StockAlertsSnsTopic:
Value: !Ref StockAlertsSnsTopic
ConfluentSinkUserAccessKey:
Value: !Ref ConfluentSinkUserAccessKey
ConfluentSinkUserSecret:
Value: !GetAtt ConfluentSinkUserAccessKey.SecretAccessKey
SinkConsumerFunction:
Value: !Ref SinkConsumerFunction
The resources being created in this SAM / CloudFormation template include an IAM User and Access Key/Secret pair which are to be used by the AWS Lambda Sink Connector in Confluent Cloud, an SNS topic for sending Email alerts to notify of new stock high / low values and, an AWS Lambda function with Python 3.8 runtime to accept the Kafka events and send them to the SNS topic for email notifications.
Next I add the Python source code for consuming the events into the app.py file as shown below.
import base64
import json
import logging
import os
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(events, context):
logger.info({'event': events})
# use only the most recent stock update if
# multiple arrive in same batch
latest_prices = {}
for evt in events:
value = evt['payload']['value']
key = base64.b64decode(evt['payload']['key']).decode('utf-8')
value.update(SYMBOL=key)
value.update(timestamp=evt['payload']['timestamp'])
existing_price = latest_prices.get(key)
if not existing_price or existing_price['timestamp'] < value['timestamp']:
latest_prices[key] = value
logger.info({'latest_prices': latest_prices})
# assemble message for email notification
messages = []
for stock_highlow in latest_prices.values():
symbol = stock_highlow['SYMBOL']
desc = stock_highlow['PRICE_DESCRIPTION']
price = stock_highlow['PRICE']
low_threshold = stock_highlow['LOW_THRESHOLD']
high_threshold = stock_highlow['HIGH_THRESHOLD']
messages.append(f"{symbol} has new {desc} of ${price} with thresholds ({low_threshold} - {high_threshold})")
logger.info({'messages': messages})
# send email via SNS topic
sns = boto3.client('sns')
msg = '\n'.join(messages)
try:
sns.publish(
TopicArn=os.environ['SNS_TOPIC'],
Message=msg,
Subject='Lambda Stocks Stream Alert'
)
except Exception as e:
logger.error({
'error': 'failed_sns_publish',
'exception': str(e),
'topic': os.environ['SNS_TOPIC'],
'message': msg
})
raise e
Then in the same directory as the template.yaml file I build the project.
sam build --use-container
Following that I deploy it with the following interactive SAM deploy command which will ask me for a valid email address to send the email alerts to then I an accpet the defaults for the remaining.
sam deploy --guided --stack-name sam-kafka-sink-consumer \
--region us-east-2 --capabilities CAPABILITY_NAMED_IAM
After the command is finished the resources will be deployed to the AWS Cloud and their will be some output as shown below that is required for configuring the AWS Lambda Sink Connector in the Confluent Cloud.
CloudFormation outputs from deployed stack
----------------------------------------------------------------------------------------
Outputs
----------------------------------------------------------------------------------------
Key ConfluentSinkUserAccessKey
Description -
Value YOUR-KEY
Key StockAlertsSnsTopic
Description -
Value arn:aws:sns:us-east-2:YOUR-ACCOUNT-ID:high-low-stocks
Key ConfluentSinkUserSecret
Description -
Value YOUR-SECRET
Key SinkConsumerFunction
Description -
Value YOUR-FUNCTION-NAME
----------------------------------------------------------------------------------------
Your values for YOUR-KEY YOUR-SECRET and YOUR-FUNCTION-NAME will be different obviously since these are placeholders I've filled in.
The last peice to this puzzle is to configure the AWS Lambda Sink Connector in Confluent Cloud to pull data from the stocks_thresholds_chg topic and feed it to the AWS Lambda function created in the last step.
A good place to start figuring out what information is needed to launch the connector is the ccloud connector-catalog describe command.
ccloud connector-catalog describe LambdaSink
Output.
Following are the required configs:
connector.class: LambdaSink
input.data.format : ["input.data.format" is required Value "null" doesn't belong to the property's "input.data.format" enum Value "null" is not a valid "Input message format" type]
name : ["name" is required]
kafka.api.key : ["kafka.api.key" is required]
kafka.api.secret : ["kafka.api.secret" is required]
aws.access.key.id : ["aws.access.key.id" is required]
aws.secret.access.key : ["aws.secret.access.key" is required]
aws.lambda.function.name : ["aws.lambda.function.name" is required]
tasks.max : ["tasks.max" is required]
topics : [org.apache.kafka.common.config.ConfigException: Must configure one of topics or topics.regex "topics" is required]
However, for more information its a good idea to have a look at the official Lambda Sink Connector docs on Confluent.
I place the following configs in a file named lambda-connector-config.json as shown below.
{
"name": "stocks-highlow-lambda-sink",
"connector.class": "LambdaSink",
"input.data.format": "AVRO",
"kafka.api.key": "AOGURDQHU4H4643L",
"kafka.api.secret": "KNUrgllSgX5a+OXGmw0OVW1RNEwMTwEgfSOcF0iBRA8R5BzG2wt642c1tgXiwRfj",
"aws.access.key.id": "YOUR-KEY",
"aws.secret.access.key": "YOUR-SECRET",
"aws.lambda.function.name": "YOUR-FUNCTION-NAME",
"aws.lambda.invocation.type": "async",
"tasks.max": "1",
"topics": "stocks_thresholds_chg"
}
Then I use the ccloud CLI to create the Connector inside Confluent Cloud.
ccloud connector create --config lambda-connector-config.json --cluster lkc-x88n1
Verify the status of the connector to make sure it progresses past the PROVISIONING state and into the RUNNING state.
ccloud connector list
Output.
ID | Name | Status | Type | Trace
+-----------+----------------------------+---------+--------+-------+
lcc-ppmxm | stocks-highlow-lambda-sink | RUNNING | sink |
lcc-v2m6z | stocks-datagen | RUNNING | source |
Once the connector reaches the running state I can check the AWS SAM logs to verify that the Lambda Sink Conumer function is being invoked.
To tail the logs run the following command in the same directory as the SAM template.yaml
sam logs --stack-name sam-kafka-sink-consumer --name SinkConsumerFunction --tail
I also start seeing email notifications informating me of new stock quotes.
At this point my tutorial is over and I can tear down the resources I've provisioned in my AWS Cloud Account as well as my Confluent Cloud Account.
To start I'll delete the SAM Application resources by nuking the underlying CloudFormation stack.
aws cloudformation delete-stack --stack-name sam-kafka-sink-consume --region us-east-2
Then I can remove the Confluent Cloud resources starting with the connectors.
ccloud connector delete lcc-ppmxm
ccloud connector delete lcc-v2m6z
Then I'll delete the ksqlDB app.
ccloud ksql app delete lksqlc-311jm
After that I delete the Kafka Cluster.
ccloud kafka cluster delete lkc-x88n1
Follow that up by deleting the serverless-stocks-demo environment.
ccloud environment delete env-g526n
And last but not least the service account I created for Kafka Clients can be deleted.
ccloud service-account delete 244423
At this point I have purged all the resources I created for this tutorial :)
thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com
In this article I have presented a demonstration of how one can build an end-to-end fully Cloud Native Serverless streaming data pipeline for both real-time analytical computation as well as event driven architecture using Confluent Cloud and Amazon Web Services Cloud.
As always, thanks for reading and please do not hesitate to critique or comment below.