Pipelining Kafka Events into Snowflake with Dockerized Kafka Connect

In this article I show how to implement a data pipelining solution utilizing Dockerized Kafka Connect and the Snowflake Sink Connector to pull events out of Apache Kafka and into the Snowflake Cloud Data Warehouse. To demonstrate this I use a Snowflake trial account along with a Docker Compose enviroment composed of Confuent Community Edition Docker images for Apache Kafka, Confluent Schema Registry and Kafka Connect.

The complete code for this project can be found on my GitHub account.

Kafka Connector User Setup

1) Create a private key (use a passphrase like Develop3r) for authenticating to Snowflake.

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8

Assign private key's contents to shell variable named SNOWFLAKE_PRIVATE_KEY which is everything between the header line of "-----BEGIN ENCRYPTED PRIVATE KEY-----" and the footer line of "-----END ENCRYPTED PRIVATE KEY-----"

SNOWFLAKE_PRIVATE_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' rsa_key.p8`|tr -d ' ') 

2) Create a public key (use a passphrase like Develop3r) for authenticating to Snowflake.

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Assign the public key's contents to shell variable named SNOWFLAKE_PUBLIC_KEY which is everything between the header line of "-----BEGIN PUBLIC KEY-----" and footer line "-----END PUBLIC KEY-----"

SNOWFLAKE_PUBLIC_KEY=$(echo `sed -e '2,$!d' -e '$d' -e 's/\n/ /g' rsa_key.pub`|tr -d ' ')

3) Create Snowflake Objects

In this step I create the following Snowflake objects to be used in the integration between the orders events stored in Kafka and ingested into Snowflake via Kafka Connect.

  • Data Warehouse named KAFKA_CONNECT
  • Database named ORDERS_DB
  • Schema within ORDERS_DB named ORDERS_SCHEMA
  • Role named KAFKA_CONNECT_ORDERS_ROLE with the following grants
    • USAGE to ORDERS_DB database and ORDERS_SCHEMA
    • CREATE TABLE, STAGE and PIPE to ORDERS_SCHEMA
  • User named ORDERS_CONNECT_USER with the KAFKA_CONNECT_ORDERS_ROLE granted to it

In this step I am creating everything from scratch but, this might not be what is desired when you want to work with existing tables and stages. In that case you would likely want to grant the role ownership to the table along with read and write to the stage.

In a new worksheet within the Snowflake console execute the following SQL statements.

USE ROLE SYSADMIN;

CREATE WAREHOUSE KAFKA_CONNECT WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE;

USE WAREHOUSE KAFKA_CONNECT;

CREATE DATABASE ORDERS_DB;

USE DATABASE ORDERS_DB;

CREATE SCHEMA ORDERS_SCHEMA;

USE ROLE SECURITYADMIN;
CREATE ROLE KAFKA_CONNECT_ORDERS_ROLE;

GRANT USAGE ON WAREHOUSE KAFKA_CONNECT TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

GRANT USAGE ON DATABASE ORDERS_DB TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

USE ROLE SYSADMIN;
USE WAREHOUSE KAFKA_CONNECT;
USE DATABASE ORDERS_DB;

GRANT USAGE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

GRANT CREATE TABLE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

GRANT CREATE STAGE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

GRANT CREATE PIPE ON SCHEMA ORDERS_SCHEMA TO ROLE KAFKA_CONNECT_ORDERS_ROLE;

USE ROLE SECURITYADMIN;

CREATE USER ORDERS_CONNECT_USER 
  PASSWORD = 'Develop3r'
  DEFAULT_WAREHOUSE = 'KAFKA_CONNECT'
  DEFAULT_NAMESPACE = 'ORDERS_DB.ORDERS_SCHEMA'
  RSA_PUBLIC_KEY = 'REPLACE WITH THE VALUE OF SNOWFLAKE_PUBLIC_KEY VARIABLE'
  DEFAULT_ROLE = 'KAFKA_CONNECT_ORDERS_ROLE';
  
GRANT ROLE KAFKA_CONNECT_ORDERS_ROLE TO USER ORDERS_CONNECT_USER;

Define Kafka Connect Docker Image and Service

Use the confluentinc/cp-kafka-connect-base:6.1.4 Kafka Connect image from Confluent in a Dockerfile within a new directory named kafka-connect-integration

mkdir kafka-connect-integration
cd kafka-connect-integration

Then add the following Dockerfile which installs the snowflakeinc/snowflake-kafka-connector:1.7.0 connector via the confluent-hub CLI tool provided in the Docker image.

FROM confluentinc/cp-kafka-connect-base:6.1.4

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars"

RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.7.0

Then use docker-compose.yml file to include a new service definition named kafka-connect and point it to this Dockerfile.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-connect:
    build: kafka-connect-integration
    container_name: kafka-connect
    hostname: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars

Launch the Docker Compose services.

docker-compose up

Start Producing Fictitious Order Events to Kafka

The Snowflake Sink Connector is a integration tool used to pipeline events from a Kafka cluster to the Snowflake data warehouse. So before I get too much further I'll want some Kafka events present in Kafka to pipeline into Snowflake. To accomplish this I can use the Orders Avro producer I presented in the preceeding article Kafka Clients in Java with Avro Serialization and Confluent Schema Registry . Since the Docker Compose services are already running I can simply launch the producer using gradle as so.

./gradlew run --args="producer"

This will generate fake order events in the Avro data format shown below.

{
    "namespace": "com.thecodinginterface.avro.orders",
    "type": "record",
    "name": "OrderValue",
    "fields": [
        { "name": "id", "type": "string"},
        { "name": "amount", "type": "int"},
        { "name": "created",
          "type": {
              "type": "long",
              "logicalType": "local-timestamp-millis"
          }
        },
        {"name": "customer", "type": "string"},
        {"name": "creditcard", "type": "string"}
    ]
}

Launch Snowflake Sink Connector

First I verify that the SnowflakeSinkConnector is present by querying the REST API endpoint.

http http://localhost:8083/connector-plugins

Output.

HTTP/1.1 200 OK
Content-Length: 393
Content-Type: application/json
Date: Tue, 18 Jan 2022 04:04:01 GMT
Server: Jetty(9.4.43.v20210629)

[
    {
        "class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "type": "sink",
        "version": "1.7.0"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
    }
]

Next I create the Snowflake Sink Connector configuration file named snowflake-connector.json which is detailed here in the Snowflake docs for launching connector in distributed mode.

{
  "name":"OrdersAvroConnector",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"orders-avro",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"somealphanumeric.us-east-2.aws.snowflakecomputing.com:443",
    "snowflake.user.name":"ORDERS_CONNECT_USER",
    "snowflake.private.key":"REPLACE WITH THE VALUE OF SNOWFLAKE_PRIVATE_KEY VARIABLE",
    "snowflake.private.key.passphrase":"Develop3r",
    "snowflake.database.name":"ORDERS_DB",
    "snowflake.schema.name":"ORDERS_SCHEMA",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
  }
}

Lastly, I launch the Snowflake Sink Connector by issuing a POST request to the Kafka Connect REST API endpoint as shown below.

http POST http://localhost:8083/connectors @snowflake-connector.json

Output.

HTTP/1.1 201 Created
Content-Length: 2549
Content-Type: application/json
Date: Tue, 18 Jan 2022 04:44:38 GMT
Location: http://localhost:8083/connectors/OrdersAvroConnector
Server: Jetty(9.4.43.v20210629)

{
    "config": {
        "buffer.count.records": "10000",
        "buffer.flush.time": "60",
        "buffer.size.bytes": "5000000",
        "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "name": "OrdersAvroConnector",
        "snowflake.database.name": "ORDERS_DB",
        "snowflake.private.key": "RETURNS WITH THE VALUE OF SNOWFLAKE_PRIVATE_KEY VARIABLE",
        "snowflake.private.key.passphrase": "Develop3r",
        "snowflake.schema.name": "ORDERS_SCHEMA",
        "snowflake.url.name": "somealphanumeric.us-east-2.aws.snowflakecomputing.com:443",
        "snowflake.user.name": "ORDERS_CONNECT_USER",
        "tasks.max": "8",
        "topics": "orders-avro",
        "value.converter": "com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    },
    "name": "OrdersAvroConnector",
    "tasks": [],
    "type": "sink"
}

I can now login to the Snowflake instance using the snowsql CLI tools with the ORDERS_CONNECT_USER and the private key like so.

snowsql -a somealphanumeric.us-east-2.aws -u ORDERS_CONNECT_USER --private-key-path=rsa_key.p8

Then list the tables to see the name of the table that was created by the connector.

ORDERS_CONNECT_USER#KAFKA_CONNECT@ORDERS_DB.ORDERS_SCHEMA>SHOW TABLES;
+-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------+
| created_on                    | name                   | database_name | schema_name   | kind  | comment | cluster_by | rows |  bytes | owner                     | retention_time | automatic_clustering | change_tracking | is_external |
|-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------|
| 2022-01-17 20:44:42.567 -0800 | ORDERS_AVRO_1814321382 | ORDERS_DB     | ORDERS_SCHEMA | TABLE |         |            | 2243 | 203264 | KAFKA_CONNECT_ORDERS_ROLE | 1              | OFF                  | OFF             | N           |
+-------------------------------+------------------------+---------------+---------------+-------+---------+------------+------+--------+---------------------------+----------------+----------------------+-----------------+-------------+

Now if I query the newly created ORDERS_AVRO_1814321382 table that was generated by the connector I get the following proving that data is flowing from Kafak into Snowflake.

ORDERS_CONNECT_USER#KAFKA_CONNECT@ORDERS_DB.ORDERS_SCHEMA>select * from ORDERS_AVRO_1814321382 limit 3;
+--------------------------------------------------+------------------------------------------------+
| RECORD_METADATA                                  | RECORD_CONTENT                                 |
|--------------------------------------------------+------------------------------------------------|
| {                                                | {                                              |
|   "CreateTime": 1642479864337,                   |   "amount": 6996,                              |
|   "key": "78b91d3a-f09b-456c-a6e3-e742d098f194", |   "created": 1642458263853,                    |
|   "offset": 0,                                   |   "creditcard": "1428",                        |
|   "partition": 0,                                |   "customer": "Mindi Schultz",                 |
|   "schema_id": 1,                                |   "id": "78b91d3a-f09b-456c-a6e3-e742d098f194" |
|   "topic": "orders-avro"                         | }                                              |
| }                                                |                                                |
| {                                                | {                                              |
|   "CreateTime": 1642479865974,                   |   "amount": 9855,                              |
|   "key": "4c25e284-94fc-47f1-8cd9-994e387bfcad", |   "created": 1642458265973,                    |
|   "offset": 1,                                   |   "creditcard": "3325",                        |
|   "partition": 0,                                |   "customer": "Wendy Carroll Sr.",             |
|   "schema_id": 1,                                |   "id": "4c25e284-94fc-47f1-8cd9-994e387bfcad" |
|   "topic": "orders-avro"                         | }                                              |
| }                                                |                                                |
| {                                                | {                                              |
|   "CreateTime": 1642479866075,                   |   "amount": 8856,                              |
|   "key": "5cc3520c-f9a3-453e-a429-0914053dbc32", |   "created": 1642458266075,                    |
|   "offset": 2,                                   |   "creditcard": "4448",                        |
|   "partition": 0,                                |   "customer": "Kim Klocko",                    |
|   "schema_id": 1,                                |   "id": "5cc3520c-f9a3-453e-a429-0914053dbc32" |
|   "topic": "orders-avro"                         | }                                              |
| }                                                |                                                |
+--------------------------------------------------+------------------------------------------------+
3 Row(s) produced. Time Elapsed: 0.388s

Conclusion

In this article I gave a practical example of how one can use the Snowflake Sink Connector for Kafka to pipeline Avro based Kafka events into Snowflake in a Docker Compose environment.

As always, I thank you for reading and please feel free to ask questions or critique in the comments section below.

Share with friends and colleagues

[[ likes ]] likes

Community favorites for Data Engineering

theCodingInterface