How to Use Kafka Connect Datagen with Docker and Confluent Schema Registry

Introduction to kafka-connect-datagen Connector Plugin

In this How To article I demonstrate setting up a Docker Compose based implementation of the Community Components of the Confluent Platform complete with the kafka-connect-datagen plugin for Kafka Connect to generate test and/or developement data useful for working with Kafka. The GitHub Repo for the kafka-connect-datagen project describes Kafka Connect Datagen as a Kafka Connect plugin which is used to generate development or testing data and load it into Kafka. The connector is intended to be used in conjunction with a local install of Confluent Platform or within a Docker container.

In order to use the kafka-connect-datagen connector I have identified the following steps to be necessary.

1) Install the Connector Plugin to Kafka Connect Service

To do this I will use Docker Compose to install and run the full suite of Confluent Platform Community.

2) Define a Schema and Connector Plugin Configuration

The schema should be defined in a format compliant with Avro Random Generator which is used internally by the kafka-connect-datagen connector plugin.

Then use the schema specification to configure the Connector configuration parameters which will need submitted to the Kafka Connect REST API.

3) Submit the Connector Configuration to Kafka Connect to Start Generating Data

Once the kafka-connect-datagen connector plugin configuration is defined you then wrap it in JSON form as an HTTP POST request to the Kafka Connect REST API submitting the Connector and initating a worker task.

4) Process the Data Generated by Connector using Kafka Client

With the kafka-connect-datagen pumping in randomly generated, customized, data into Kafka you can then process and consume it with any of the several Kafka clients available such as the regular ole Kafka Consumer, a Kafka Streams application or, a Confluent ksqlDB application.

Watch on YouTube if you Prefer Video

Running Confluent Platform Community Edition with Docker Compose

The fine folks at Confluent have generously supplied a public GitHub repository with several pre-written docker-compose.yaml files complete with reasonable parameter defaults and network configurations so all we really need to do is copy one from the cp-all-in-one-community directory. For HTTP based interactions I have a personal preference for the HTTPie HTTP CLI client so, this is what I'll be using for this article.

First I download the community specific docker-compose.yml file.

http --download https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.1-post/cp-all-in-one-community/docker-compose.yml

Next I use Docker Compose to download, launch the images as containers and services specified in the docker-compose.yml file.

docker-compose up -d

When all services are reported as being up I am good to start using the Confluent Platform.

To get familiar with the Kafka Connect REST API I query it to verify that the Kafka Connect Datagen connector plugin is in fact installed. From the docker-compose.yml file you will see that that the Kafka Connect container exposes port 8083 for usage with it's REST API.

http http://localhost:8083/connector-plugins | jq '.[].class'

Which then returns a list of pre-installed Kafka Connect plugins.

"io.confluent.kafka.connect.datagen.DatagenConnector"
"org.apache.kafka.connect.file.FileStreamSinkConnector"
"org.apache.kafka.connect.file.FileStreamSourceConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"

For a quick sanity check its also worth verifying that no connectors are currently configured and running.

http http://localhost:8083/connectors

And the output should be an empty JSON array like so.

Getting Familiar by using the Quick Start Predefined Schemas

Confluent packages a hand full of predefined "quickstart" schemas for experimenting with the kafka-connect-datagen connector which feature pretty frequently in the Confluent Blog and Documentation. For completeness I will start by demonstrating how to use the orders Quick Start Schema even though the primary focus of this How To article is more on providing examples of defining your own Schema.

To start I create a config file named datagen-orders-config.json that will be used with the Connector which will generate 1000 orders in JSON value format into an orders_json topic every one second (1000 milliseconds).

{
  "name": "datagen-orders",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "orders_json",
    "quickstart": "orders",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 1000,
    "tasks.max": "1"
  }
}

This can be submitted to the Kakfa Connect REST API to configure and run as a Connect task like so.

http POST http://localhost:8083/connectors @datagen-orders-config.json

Now if I fetch a listing of the connectors I should see the newly configured connector named datagen-orders.

http http://localhost:8083/connectors -b

Output.

[
    "datagen-orders"
]

I can verify its running with the following.

http http://localhost:8083/connectors/datagen-orders/status -b

Output.

{
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "name": "datagen-orders",
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect:8083"
        }
    ],
    "type": "source"
}

Then I can consume the test data with the kafka-console-consumer CLI utility.

docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic orders_json --property print.key=true

Example output (your's will vary).

559	{"ordertime":1516736262774,"orderid":559,"itemid":"Item_258","orderunits":8.01080375034306,"address":{"city":"City_69","state":"State_42","zipcode":32690}}
560	{"ordertime":1490281818110,"orderid":560,"itemid":"Item_941","orderunits":0.6616359019581093,"address":{"city":"City_55","state":"State_66","zipcode":15578}}
561	{"ordertime":1494592637748,"orderid":561,"itemid":"Item_420","orderunits":4.5526290218329555,"address":{"city":"City_78","state":"State_21","zipcode":69310}}

The Kafka Connect REST API provides the ability to pause a connector.

http PUT http://localhost:8083/connectors/datagen-orders/pause -b
http http://localhost:8083/connectors/datagen-orders/status -b

Output.

{
    "connector": {
        "state": "PAUSED",
        "worker_id": "connect:8083"
    },
    "name": "datagen-orders",
    "tasks": [
        {
            "id": 0,
            "state": "PAUSED",
            "worker_id": "connect:8083"
        }
    ],
    "type": "source"
}

As well as resume it.

http PUT http://localhost:8083/connectors/datagen-orders/resume -b
http http://localhost:8083/connectors/datagen-orders/status -b

Output shows running again.

{
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "name": "datagen-orders",
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect:8083"
        }
    ],
    "type": "source"
}

Naturally you can delete the connector as well.

http DELETE http://localhost:8083/connectors/datagen-orders -b

Specifying Your Own Custom Schema

The precanned Quick Start schemas are definitely valuable for getting you up and going with the kafka-connect-datagen Connector Plugin but, in my opinion the true value is in the ability to generate mock data that is in the same shape, form, and range of values of the events you are working with in your specific projects.

As an example lets say I have a use case to ingest real-time TV commercial ratings rated from 1 to 5 over a set of different time lengths in seconds of 30, 45, 60. In this case I'd define an Avro Random Generator compliant schema as shown below.

{
  "type": "record",
  "name": "commercialrating",
  "fields": [
    {
      "name": "brand",
      "type": {
        "type": "string",
        "arg.properties": {
          "options": ["Acme", "Globex"]
        }
      }
    }, 
    {
      "name": "duration",
      "type": {
        "type": "int",
        "arg.properties": {
          "options": [30, 45, 60]
        }
      }
    },
    {
      "name": "rating",
      "type": {
        "type": "int",
        "arg.properties": {
          "range": { "min": 1, "max": 5 }
        }
      } 
    }
  ]
}

Next I create a new Connector config file named datagen-json-commercials-config.json but, this time instead of specifying the "quickstart" parameter I use the "schema.string" to specify the above schema definition and allotted values along with the "schema.keyfield" to specify the field within the schema that should be used as the key. Again, this will generate 1000 commercial ratings one every 500 milliseconds to a Kafka topic named commercials_json.

{
  "name": "datagen-commercials-json",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "commercials_json",
    "schema.string": "{\"type\":\"record\",\"name\":\"commercialrating\",\"fields\":[{\"name\":\"brand\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Acme\",\"Globex\"]}}},{\"name\":\"duration\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"options\": [30, 45, 60]}}},{\"name\":\"rating\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"range\":{\"min\":1,\"max\":5}}}}]}",
    "schema.keyfield": "brand",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 1000,
    "tasks.max": "1"
  }
}

Creating and starting the datagen-commercials-json Connector worker is exactly like what was shown for the orders quick start.

http POST http://localhost:8083/connectors @datagen-json-commercials-config.json -b

Output.

{
    "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "iterations": "1000",
        "kafka.topic": "commercials_json",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "max.interval": "1000",
        "name": "datagen-commercials-json",
        "schema.keyfield": "brand",
        "schema.string": "{\"type\":\"record\",\"name\":\"commercialrating\",\"fields\":[{\"name\":\"brand\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Acme\",\"Globex\"]}}},{\"name\":\"duration\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"options\": [30, 45, 60]}}},{\"name\":\"rating\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"range\":{\"min\":1,\"max\":5}}}}]}",
        "tasks.max": "1",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    },
    "name": "datagen-commercials-json",
    "tasks": [],
    "type": "source"
}

Its always good to then check the status.

http http://localhost:8083/connectors/datagen-commercials-json/status -b

Status output.

{
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "name": "datagen-commercials-json",
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect:8083"
        }
    ],
    "type": "source"
}

For a sanity check I'll consume a few commerical ratings events as well.

docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic commercials_json --property print.key=true

Example output, which will obviously vary each time being ran.

Acme	{"brand":"Acme","duration":60,"rating":1}
Acme	{"brand":"Acme","duration":45,"rating":2}
Globex	{"brand":"Globex","duration":30,"rating":2}
Acme	{"brand":"Acme","duration":30,"rating":4}
Acme	{"brand":"Acme","duration":45,"rating":1}
Acme	{"brand":"Acme","duration":45,"rating":4}

To finish this section I delete the connector.

http DELETE http://localhost:8083/connectors/datagen-commercials-json -b

Working with Avro Schema and Confluent Schema Registry

In this last practical section I would like to recreate the commercial ratings data generation connector to utilize the Avro serialization system along with the Confluent Schema registry. As most readers will likely arealy know the Confluent Schema Registry is part of the Confluent Platform and, thus was also included as a service in the Docker Compose YAML definition exposing a port of 8081 to access the REST API interface. The REST API is reachable from the Host machine from localhost:8081 and via the schema-registry networked host name from the other Docker services.

In order to inform the kafka-connect-datagen plugin to use Avro and the Schema Registry I simply specify the io.confluent.connect.avro.AvroConverter class to be used for the value.converter and point the value.converter.schema.registry.url property to the Schema Registry via port and hostname accessible from within the Docker network.

I place this configuration in a file named datagen-avro-commercials-config.json

{
  "name": "datagen-commercials-avro",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "commercials_avro",
    "schema.string": "{\"type\":\"record\",\"name\":\"commercialrating\",\"fields\":[{\"name\":\"brand\",\"type\":{\"type\": \"string\",\"arg.properties\":{\"options\":[\"Acme\",\"Globex\"]}}},{\"name\":\"duration\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"options\": [30, 45, 60]}}},{\"name\":\"rating\",\"type\":{\"type\":\"int\",\"arg.properties\":{\"range\":{\"min\":1,\"max\":5}}}}]}",
    "schema.keyfield": "brand",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "false",
    "max.interval": 1000,
    "iterations": 1000,
    "tasks.max": "1"
  }
}


Then submitting the config to the Kafka Connect service to create and initiate a Connect worker task is the same as before.

http POST http://localhost:8083/connectors @datagen-avro-commercials-config.json -b

To test the ability to work with the data I obviously now need a Avro enabled consumer such as the kafka-avro-console-consumer

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic commercials_avro \
     --property schema.registry.url=http://localhost:8081

With the expected variable output.

{"brand":"Acme","duration":30,"rating":4}
{"brand":"Globex","duration":45,"rating":3}
{"brand":"Globex","duration":60,"rating":4}
{"brand":"Acme","duration":60,"rating":2}
{"brand":"Globex","duration":30,"rating":1}
{"brand":"Acme","duration":45,"rating":2}

And finally clean up by deleting the connector.

http DELETE http://localhost:8083/connectors/datagen-commercials-avro

Resources for Learning More

thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com

Conclusion

In this How To article I introduced the kafka-connect-datagen plugin usage within Docker for the Kafka Connect service prominently used within the Kafka ecosystem. I presented how to use one of the preconfigured Quick Start schemas as well as how to create a custom schema and integrate it with Confluent Schema Registry.

As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Community favorites for Data Engineering

theCodingInterface