Processing Streams of Stock Quotes with Kafka and Confluent ksqlDB

Introduction

A recurring challenge in modern organizations is the need to deliver actionable, data driven, insights in increasingly shrunken timelines (near real-time analytics) or to react in near real-time to some stimuli in the world in a scalable and automated fashion (event driven systems). This has driven me down the path of investigating various options of stream processing technologies. Naturally Kafka and it's ecosystem has caught my attention so, I though I'd share an interesting use case that I experimented with.

In this article I present an example of how one can use Kafka and the Confluent ksqlDB stream processing database to process a simplified dataset of fake stock quotes. The ultimate goal of this excercise will be to user ksqlDB to inspect a stream of stock quotes for individual companies in 1 minute windows and identify when a window has introduced a new daily high or low stock price. When new daily highs and lows are found they should be filtered down and emitted in a change log stream so that a down stream subscribing application could consume from this topic, essentially representing an event driven system.

For demonstration I will be working with contrived datasets for the fictitious companies Acme Corp and Hooli which I've saved to tab separated files.

Here is the acme-stocks.tsv file.

symbol  quote low_day high_day
ACME  100.29  100.29  100.29
ACME  100.30  100.29  100.30
ACME  100.17  100.17  100.30
ACME  100.19  100.17  100.30
ACME  100.12  100.12  100.30
ACME  100.12  100.12  100.30
ACME  100.11  100.11  100.30
ACME  100.58  100.11  100.58
ACME  100.55  100.11  100.58
ACME  100.77  100.11  100.77
ACME  100.75  100.11  100.77
ACME  100.78  100.11  100.78
ACME  100.77  100.11  100.78
ACME  100.76  100.11  100.78
ACME  100.80  100.11  100.80
ACME  100.81  100.11  100.81
ACME  100.79  100.11  100.81
ACME  100.82  100.11  100.82
ACME  100.77  100.11  100.82
ACME  100.78  100.11  100.82
ACME  100.80  100.11  100.82

And here is the hooli-stocks.tsv file.

symbol  quote low_day high_day
HOOLI,  118.23  118.23  118.23
HOOOLI  118.37  118.23  118.37
HOOOLI  118.42  118.23  118.42
HOOOLI  118.41  118.23  118.42
HOOOLI  118.39  118.23  118.42
HOOOLI  118.37  118.23  118.42
HOOOLI  118.30  118.23  118.42
HOOOLI  118.25  118.23  118.42
HOOOLI  118.20  118.20  118.42
HOOOLI  118.11  118.11  118.42
HOOOLI  118.09  118.09  118.42
HOOOLI  118.02  118.02  118.42
HOOOLI  117.99  117.99  118.42
HOOOLI  117.95  117.95  118.42
HOOOLI  117.91  117.91  118.42
HOOOLI  117.92  117.91  118.42
HOOOLI  117.94  117.91  118.42
HOOOLI  117.89  117.89  118.42
HOOOLI  117.88  117.88  118.42
HOOOLI  117.90  117.88  118.42
HOOOLI  117.91  117.88  118.42

Project Setup

For easy local development and experimentation I like to use the Confluent Docker images provided and similar to what is shown in Confluent's various Quick Start tutorials.

Below is the docker-compose.yaml file I will be working with.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.1.1
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.17.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.17.0
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

The natural next step would be to fire up this Docker Compose service like so.

docker-compose up -d

To push the contrived stock quote data into Kafka where I can then play with it using ksqlDB I am utilizing a simple Python based producer. Therefore, I will go ahead and make a Python3 virtual environment and pip install the confluent-kafka Python library like so.

python3 -m venv venv
source venv/bin/activate
(venv) pip install confluent-kafka

Publishing Stock Quotes to Kafka with a Python Producer

At this point I use the Kafka/Confluent CLI to create a topic named stocks to hold the raw stock quote data like so.

docker-compose exec kafka kafka-topics --bootstrap-server kafka:9092 --create \
	--topic stocks --partitions 1 --replication-factor 1

Then I can start producing the stock quotes presented earlier for Acme Corp and Hooli using a simple Python producer which I've saved the source code in a file named pyproducer.py as shown below.

mport argparse
import json
import logging
import sys
import time

from confluent_kafka import Producer, KafkaError

logging.basicConfig(
  format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
  datefmt='%Y-%m-%d %H:%M:%S',
  level=logging.INFO,
  handlers=[
      logging.FileHandler("producer.log"),
      logging.StreamHandler(sys.stdout)
  ]
)


def ack_handler(err, msg):
    if err:
        logging.error({
            'message': 'failed to deliver message',
            'error': str(err)
        })
    else:
        logging.info({
            'message': 'successfully produced message',
            'topic': msg.topic(),
            'partition': msg.partition(),
            'offset': msg.offset()
        })


def parse_stocks(filepath):
    stocks = []
    with open(filepath) as fp:
        for i, line in enumerate(fp):
            line = line.strip()
            if i > 0 and line: # skip header row
                symbol, quote, low_day, high_day = line.split()
                stocks.append({
                    'symbol': symbol,
                    'quote': float(quote),
                    'low_day': float(low_day),
                    'high_day': float(high_day)
                })
    return stocks


def publish_quote(producer, topic, stock_quote):
    producer.produce(topic,
                    key=stock_quote['symbol'],
                    value=json.dumps(stock_quote),
                    on_delivery=ack_handler)


def publish_stocks(topic, publish_interval):
    configs = {
      'bootstrap.servers': 'localhost:9092',
      'linger.ms': 10,
      'partitioner': 'murmur2_random'
    }
    producer = Producer(configs)

    acme_stocks = parse_stocks('./acme-stocks.tsv')
    hooli_stocks = parse_stocks('./hooli-stocks.tsv')

    for i in range(len(acme_stocks)):
        acme = acme_stocks[i]
        hooli = hooli_stocks[i]

        publish_quote(producer, topic, acme)
        publish_quote(producer, topic, hooli)

        producer.poll(0)

        time.sleep(publish_interval)

    producer.flush()
    logging.info({'message': 'Thats all folks!'})


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--topic', help='kafka topic to produce to')
    parser.add_argument('--publish-interval', help='time to wait, in seconds, between publishing stocks', type=int, default=19)
    args = parser.parse_args()
    
    print("Publishing stock data. Enter CTRL+C to exit.")
    while True:
        publish_stocks(args.topic, args.publish_interval)

The goal of this Python Producer program is to read each line of stock quote data and publish it to the single broker Kafka cluster running in Docker at an interval of 19 seconds which means each one minute window should have roughly three events (ie, stock quotes) per company. This should be a rate of data ingest that is slow enough for learning and demonstration yet fast enough to allow us to see the power of operating on data in motion.

I can run this as follows.

(venv) python pyproducer.py --topic stocks

Processing Stock Data with ksqlDB

In a separate terminal I can now fire up a ksqldb CLI session with the following command.

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Once inside the ksqldb shell I can verify that my stocks topic is there using the following.

ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas 
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1                  
 stocks                      | 1          | 1                  
---------------------------------------------------------------

Then do simple inspection of the topic using the print statement.

ksql> print 'stocks' limit 4;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/12 05:10:33.341 Z, key: ACME, value: {"symbol": "ACME", "quote": 100.3, "low_day": 100.29, "high_day": 100.3}, partition: 0
rowtime: 2021/05/12 05:10:33.341 Z, key: HOOOLI, value: {"symbol": "HOOOLI", "quote": 118.37, "low_day": 118.23, "high_day": 118.37}, partition: 0
rowtime: 2021/05/12 05:10:53.346 Z, key: ACME, value: {"symbol": "ACME", "quote": 100.17, "low_day": 100.17, "high_day": 100.3}, partition: 0
rowtime: 2021/05/12 05:10:53.346 Z, key: HOOOLI, value: {"symbol": "HOOOLI", "quote": 118.42, "low_day": 118.23, "high_day": 118.42}, partition: 0

To start doing productive work I create a ksqlDB stream backed by my stocks topic and it's associated data like so.

CREATE STREAM stocks_stream (
  symbol VARCHAR KEY,
  quote DOUBLE,
  low_day DOUBLE,
  high_day DOUBLE
) WITH (
  KAFKA_TOPIC='stocks',
  KEY_FORMAT='KAFKA',
  VALUE_FORMAT='JSON'
);

Next I create a ksqlDB table composed of windowed aggregates (1 minutes tumbling) for each company symbol. For each window I grab the daily low and high values of the window's starting event along with the daily low and high of the window's ending event.

CREATE TABLE stock_highlow_tbl
WITH (KAFKA_TOPIC='stock_highlow_tbl') AS SELECT
  symbol,
  EARLIEST_BY_OFFSET(low_day) window_start_low_day,
  LATEST_BY_OFFSET(low_day) window_end_low_day,
  EARLIEST_BY_OFFSET(high_day) window_start_high_day,
  LATEST_BY_OFFSET(high_day) window_end_high_day,
  LATEST_BY_OFFSET(quote) window_end_quote
FROM stocks_stream
WINDOW TUMBLING ( SIZE 1 MINUTE ) 
GROUP BY symbol
EMIT CHANGES;

At this point if I list the topics again with the "SHOW topics" statement I should see the original stocks topic plus a new one backing the stock_highlow_tbl which I've similarly named stock_highlow_tbl using the WITH(KAFKA_TOPIC='...') clause.

ksql> SHOW topics;

 Kafka Topic                 | Partitions | Partition Replicas 
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1                  
 stock_highlow_tbl           | 1          | 1                  
 stocks                      | 1          | 1                  
---------------------------------------------------------------

It is worth inspecting the stock_highlow_tbl with a simple push query to have a look at the data before moving on.

ksql> SELECT *
>FROM stock_highlow_tbl
>EMIT CHANGES
>LIMIT 6;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|SYMBOL            |WINDOWSTART       |WINDOWEND         |WINDOW_START_LOW_D|WINDOW_END_LOW_DAY|WINDOW_START_HIGH_|WINDOW_END_HIGH_DA|WINDOW_END_QUOTE  |
|                  |                  |                  |AY                |                  |DAY               |Y                 |                  |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ACME              |1620796800000     |1620796860000     |100.11            |100.11            |100.77            |100.78            |100.78            |
|HOOOLI            |1620796800000     |1620796860000     |118.09            |118.02            |118.42            |118.42            |118.02            |
|ACME              |1620796860000     |1620796920000     |100.11            |100.11            |100.78            |100.8             |100.8             |
|HOOOLI            |1620796860000     |1620796920000     |117.99            |117.91            |118.42            |118.42            |117.91            |
|ACME              |1620796920000     |1620796980000     |100.11            |100.11            |100.81            |100.82            |100.82            |
|HOOOLI            |1620796920000     |1620796980000     |117.91            |117.89            |118.42            |118.42            |117.89            |
Limit Reached
Query terminated

I mentioned the above query was a special type of query known as a push query but, its also worth pointing out that every SELECT ... query I've written thus far is actually a push query so you may be wondering what that is all about. A push query is simply a query that executes continuously over an evolving stream of events or table of updates which is pretty intuitive once you run one and see the continual nature of the query resultset spilling into your terminal. 

There is a second type of query that are exclusive to aggregate tables like the stock_highlow_tbl I created called a pull query allowing you get the specific rows of the table using the PRIMARY KEY in a WHERE clause predicate which then terminates rather than pushing out the never ending series of events associated with a push query. But you may be asking ... when did you define a PRIMARY KEY? In an aggregate table (sometimes referred to as a materialized table within Confluent Docs) the field you use in your GROUP BY becomes the PRIMARY KEY  but, don't take my word for it, verify yourself with the following DESCRIBE statement.

ksql> DESCRIBE stock_highlow_tbl;

Name                 : STOCK_HIGHLOW_TBL
 Field                 | Type
--------------------------------------------------------------------------------
 SYMBOL                | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 WINDOW_START_LOW_DAY  | DOUBLE
 WINDOW_END_LOW_DAY    | DOUBLE
 WINDOW_START_HIGH_DAY | DOUBLE
 WINDOW_END_HIGH_DAY   | DOUBLE
 WINDOW_END_QUOTE      | DOUBLE

 

A pull query to get all the stock quote windows up to now returned for from the stock_highlow_tbl would look like this.

ksql> SELECT * FROM stock_highlow_tbl WHERE symbol = 'ACME';
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|SYMBOL                |WINDOWSTART           |WINDOWEND             |WINDOW_START_LOW_DAY  |WINDOW_END_LOW_DAY    |WINDOW_START_HIGH_DAY |WINDOW_END_HIGH_DAY   |WINDOW_END_QUOTE      |
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|ACME                  |1621345140000         |1621345200000         |100.11                |100.11                |100.58                |100.58                |100.55                |
|ACME                  |1621345200000         |1621345260000         |100.11                |100.11                |100.77                |100.78                |100.78                |
|ACME                  |1621345260000         |1621345320000         |100.11                |100.11                |100.78                |100.8                 |100.8                 |
|ACME                  |1621345320000         |1621345380000         |100.11                |100.11                |100.81                |100.82                |100.82                |
|ACME                  |1621345380000         |1621345440000         |100.11                |100.11                |100.82                |100.82                |100.8                 |
|ACME                  |1621345440000         |1621345500000         |100.29                |100.17                |100.29                |100.3                 |100.17                |
|ACME                  |1621345500000         |1621345560000         |100.17                |100.12                |100.3                 |100.3                 |100.12                |
|ACME                  |1621345560000         |1621345620000         |100.11                |100.11                |100.3                 |100.58                |100.55                |
|ACME                  |1621345620000         |1621345680000         |100.11                |100.11                |100.77                |100.78                |100.78                |
|ACME                  |1621345680000         |1621345740000         |100.11                |100.11                |100.78                |100.8                 |100.8                 |
|ACME                  |1621345740000         |1621345800000         |100.11                |100.11                |100.81                |100.82                |100.82                |
|ACME                  |1621345800000         |1621345860000         |100.11                |100.11                |100.82                |100.82                |100.8                 |
|ACME                  |1621345860000         |1621345920000         |100.29                |100.17                |100.29                |100.3                 |100.17                |
|ACME                  |1621345920000         |1621345980000         |100.17                |100.12                |100.3                 |100.3                 |100.12                |
|ACME                  |1621345980000         |1621346040000         |100.11                |100.11                |100.3                 |100.58                |100.55                |
Query terminated

What I want to do now is generate a new stream from the change log topic down in the storage layer created from the results of the windowed ksqlDB table.  You cannot create a stream using a ksqlDB table which is why I must drop down to the lower level topic from the storage layer rather than the ksqlDB table abstraction in the processing layer.  Also recall that the table and backing topic were both named stock_highlow_tbl.

CREATE STREAM stock_windows_stream (
  symbol VARCHAR KEY,
  window_start_low_day DOUBLE,
  window_end_low_day DOUBLE,
  window_start_high_day DOUBLE,
  window_end_high_day DOUBLE,
  window_end_quote DOUBLE
) WITH (
  KAFKA_TOPIC='stock_highlow_tbl',
  VALUE_FORMAT='JSON'
);

Lastly, I create one more final ksqlDB stream which filters down the stock_windows_stream I just created to include only 1 minute windows that have either a new daily high or low introduced in them. I also enrich this stream by adding a derived event_type column that states if the filtered windowed stream event was the result of new high or new low.

CREATE STREAM stock_changes_stream WITH (KAFKA_TOPIC='stock_changes_stream') AS  
SELECT 
	CASE 
    	WHEN window_start_low_day > window_end_low_day THEN 'new low'
        WHEN window_start_high_day < window_end_high_day THEN 'new high'
    END AS event_type,
    s.*
FROM stock_windows_stream s
WHERE window_start_low_day != window_end_low_day OR window_start_high_day != window_end_high_day
EMIT CHANGES;

Remember that streams and tables created as a result of a SELECT statement get presisted as a regular ole Kafka topics. This is notable because I can now use a traditional Kafka consumer and subscribe to the stock_changes_stream topic enabling a paradigm of reacting to the new daily low and high stock prices as they flow into the topic from the final filtered stream. I will leave that exercise up to the reader and opt to simply show another push query since I'm right here in the ksqldb shell.

ksql> SELECT * FROM stock_changes_stream EMIT CHANGES;
+-----------------+------------------+---------------------+---------------------+---------------------+---------------------+-------------------+
|SYMBOL           |EVENT_TYPE        |WINDOW_START_LOW_DAY |WINDOW_END_LOW_DAY   |WINDOW_START_HIGH_DAY|WINDOW_END_HIGH_DAY  |WINDOW_END_QUOTE   |
+-----------------+------------------+---------------------+---------------------+---------------------+---------------------+-------------------+
|ACME"            |new high          |100.11               |100.11               |100.81               |100.82               |100.82             |
|HOOOLI"          |new low           |117.91               |117.89               |118.42               |118.42               |117.89             |

Clean Up

When finished I can simply enter CTRL+C in the Python producer applications shell. I can also do the same CTRL+C to terminate the push query on the stock_changes_stream stream then type exit in the ksqldb CLI shell before finally issuing a "docker-compose down -v" command to destroy the Docker containers.

Conclusion

In this article I demonstrated a simple use case of stream processing using Kafka and Confluent's ksqlDB with some fictitious stock data. As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for Data Engineering

theCodingInterface