Kafka CLI Cheat Sheet

By Adam McQuistan in DevOps  02/25/2022 Comment

Introduction

In this article I capture a plethora of Kafka CLI commands that I've found useful. It is my goal to capture them here to serve as a cheatsheet of commands for myself and others to draw from.

To facilitate experimentation and verification of commands in a low effort / low impact environment I'll
provide the following docker-compose.yml definition utilizing publicly available Confluent community Docker images for Kafka, Zookeeper, and Schema Registry.
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.1.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:6.1.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Fire up the Docker Compose services with the following.
docker-compose up
This will produce the following docker services.
$ docker-compose ps
     Name                  Command            State                        Ports
------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run   Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
schema-registry   /etc/confluent/docker/run   Up      0.0.0.0:8081->8081/tcp
zookeeper         /etc/confluent/docker/run   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
I'll be accessing the Kafka CLI tools from these running docker containers.

To exec into the broker container use the following.
docker exec -it broker /bin/bash
 
Similarly to exec into the schema-registry container use the following.
docker exec -it schema-registry /bin/bash
 

Creating, Listing, and Describing Kafka Topics


Create people topic with 3 partitions from within broker container.
kafka-topics --bootstrap-server localhost:9092 --create --topic people --replication-factor 1 --partitions 3
 
Create pets topic with 1 partition from within broker container.
kafka-topics --bootstrap-server localhost:9092 --create --topic pets --replication-factor 1 --partitions 1
 
List topics from within broker container
kafka-topics --bootstrap-server localhost:9092 --list
Output.
__consumer_offsets
_schemas
people
pets

 

Describe the people topic.
kafka-topic --bootstrap-server localhost:9092 --describe --topic people
Output.
Topic: people	PartitionCount: 3	ReplicationFactor: 1	Configs:
	Topic: people	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: people	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: people	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
 
To create a topic with a specific retention policy and/or as a compacted topic.
kafka-topics --bootstrap-server localhost:9092 --create --topic people.promotions --partitions 1 --replication-factor 1 \
  --config retention.ms=360000 --config cleanup.policy=compact
 
To get a detailed look at al the configs that are used by a particular topic using the kafka-configs CLI tool.
kafka-configs --bootstrap-server localhost:9092 --describe --all --topic people.promotions

Output.

All configs for topic people.promotions are:
  compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer}
  leader.replication.throttled.replicas= sensitive=false synonyms={}
  message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true}
  min.insync.replicas=2 sensitive=false synonyms={STATIC_BROKER_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
  segment.jitter.ms=0 sensitive=false synonyms={}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  flush.ms=9223372036854775807 sensitive=false synonyms={}
  follower.replication.throttled.replicas= sensitive=false synonyms={}
  segment.bytes=1073741824 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=360000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=360000}
  flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
  message.format.version=2.7-IV2 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=2.7-IV2}
  max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807}
  file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000}
  max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588}
  min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
  message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime}
  preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false}
  min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
  index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096}
  unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
  retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1}
  delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
  segment.ms=604800000 sensitive=false synonyms={}
  message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807}
  segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}

Basic Schemaless Message Producing and Consuming


From the broker container publish some people data.
 
kafka-console-producer --bootstrap-server localhost:9092 --topic people
>{"name":"Stewie", "show": "Family Guy"}
>{"name":"Popeye", "show": "Popeye"}
>{"name":"Bart", "show": "The Simpsons"}
>{"name":"Fred", "show": "The Flintstones"}
>{"name":"Shaggy", "show": "Scooby-Doo"}
CTRL+C to exit producer session
 
From the broker container publish some pets with an explicit key separated by a pipe character.
kafka-console-producer --bootstrap-server localhost:9092 --topic pets --property "parse.key=true" --property "key.separator=|"
>baxter|{"name": "Baxter", "movie": "Anchorman"}
>marley|{"name": "Marley", "movie": "Marley and Me"}
>toothless|{"name": "Toothless", "movie": "How To Train Your Dragon"}
>willy|{"name": "Willy", "movie": "Free Willy"}
>babe|{"name": "Babe", "movie": "Babe"}
>hooch|{"name": "Hooch", "movie": "Turner & Hooch"}
 
From the broker container consume all people messages from the beginning.
kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginning

Output.

{"name":"Stewie", "show": "Family Guy"}
{"name":"Fred", "show": "The Flintstones"}
{"name":"Bart", "show": "The Simpsons"}
{"name":"Shaggy", "show": "Scooby-Doo"}
{"name":"Popeye", "show": "Popeye"}
^CProcessed a total of 5 messages
CTRL+C to exit producer session
 
From broker container consume all pet messages and print the key of messages.
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning --property print.key=true

Output.

baxter	{"name": "Baxter", "movie": "Anchorman"}
marley	{"name": "Marley", "movie": "Marley and Me"}
toothless	{"name": "Toothless", "movie": "How To Train Your Dragon"}
willy	{"name": "Willy", "movie": "Free Willy"}
babe	{"name": "Babe", "movie": "Babe"}
hooch	{"name": "Hooch", "movie": "Turner & Hooch"}

 

Specify an explicit consumer group id while consuming from pets topic and print each messages offset.
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning \
    --group adam-pet-consumer --property print.offset=true

Output.

Offset:0	{"name": "Baxter", "movie": "Anchorman"}
Offset:1	{"name": "Marley", "movie": "Marley and Me"}
Offset:2	{"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3	{"name": "Willy", "movie": "Free Willy"}
Offset:4	{"name": "Babe", "movie": "Babe"}
Offset:5	{"name": "Hooch", "movie": "Turner & Hooch"}
 
Specify an explicit consumer group id for people topic and print message timestamp.
kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginning \
    --group adam-people-consumer --property print.timestamp=true

Output.

CreateTime:1645938589986	{"name":"Stewie", "show": "Family Guy"}
CreateTime:1645938688396	{"name":"Fred", "show": "The Flintstones"}
CreateTime:1645938656642	{"name":"Bart", "show": "The Simpsons"}
CreateTime:1645938735597	{"name":"Shaggy", "show": "Scooby-Doo"}
CreateTime:1645938624686	{"name":"Popeye", "show": "Popeye"}

 

View and Reset Consumer Offsets

The __consumer_offsets topic stores the message offset that each consumer group should consume next. In the broker container run the following to get a look at where each consumer group is at for each topic partition.
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups

Output.

Consumer group 'adam-people-consumer' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-people-consumer people          0          2               2               0               -               -               -
adam-people-consumer people          2          2               2               0               -               -               -
adam-people-consumer people          1          1               1               0               -               -               -

Consumer group 'adam-pet-consumer' has no active members.

GROUP             TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-pet-consumer pets            0          6               6               0               -               -               -

 

Or do this for a specific group like adam-people-consumer.
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group adam-people-consumer

Output.

Consumer group 'adam-people-consumer' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-people-consumer people          0          2               2               0               -               -               -
adam-people-consumer people          2          2               2               0               -               -               -
adam-people-consumer people          1          1               1               0               -               -               -
 
Reset the adam-pet-consumer group to the beginning. Can use --dry-run to test the action of the reset or --execute to make reset permanent.
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
    --reset-offsets --to-earliest --topic people --dry-run

Output.

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
adam-pet-consumer              people                         0          0
adam-pet-consumer              people                         1          0
adam-pet-consumer              people                         2          0
Then do it for real.
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
    --reset-offsets --to-earliest --topic people --execute
 
Can also specify the specific topic and partition to reset a offset to. For example to reset adam-people-consumer group people topic partition 0 to an offset of 1 would be as follows.
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-people-consumer \
    --reset-offsets --to-offset 1 --topic people:0 --execute

Output.

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
adam-people-consumer           people                         0          1
 

Yet another way to view the consumer offsets is to fetch the contents of __consumer_offsets

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

Viewing the Amount of Data in a Topic

In broker container run this to see how many bytes are in people topic.
kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list people

Output.

Querying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"people-0","size":217,"offsetLag":0,"isFuture":false},{"partition":"people-2","size":214,"offsetLag":0,"isFuture":false},{"partition":"people-1","size":103,"offsetLag":0,"isFuture":false}]}]}]}

 

Can also use grep and awk to sum it across partitions.
kafka-log-dirs --bootstrap-server localhost:9092 --describe \
    --topic-list people | grep -oP '(?<=size":)\d+' | awk '{ sum += $1 } END { print sum }'
 

Consuming From a Specific Offset and Number of Messages

From within the broker container run the following to consume messages from the pets topic between offset 2 and 5
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets \
    --offset 2 --max-messages 3 --partition 0 --property print.offset=true

Output.

Offset:2	{"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3	{"name": "Willy", "movie": "Free Willy"}
Offset:4	{"name": "Babe", "movie": "Babe"}
Processed a total of 3 messages
 

Deleting a Topic

Run the following to delete the people topic in broker container.
kafka-topics --bootstrap-server localhost:9092 --delete --topic people
 

Producing and Consuming Avro Messages

Through either installing Confluent platform or using the Confluent Schema Registry Docker image you can work with Avro enabled console producer and consumer tools. In this article I've been working with a Dockerized Kafka environment complete with the Confluent Schema Registry so I'll use that.

First create a topic named dinosaurs from within the broker container.
kafka-topics --bootstrap-server localhost:9092 --create --topic dinosaurs --replication-factor 1 --partitions 1
 
Then from within the schema-registry container create the following Avro Schema Definition File named dinos.avsc
cat <<EOF >./dinos.avsc
{
  "type":"record",
  "namespace":"dinos",
  "name":"Dinosaur",
  "fields": [
    {
      "name":"name",
      "type":"string",
      "doc":"Name of Dinosaur"
    },
    {
      "name":"weight_low",
      "type":"int",
      "doc":"Weight lower bounds in metric tons"
    },
    {
      "name":"weight_high",
      "type":"int",
      "doc":"Weight upper bounds in metric tons"
    },
    {
      "name":"length_low",
      "type":"int",
      "doc":"Length lower bounds in meters"
    },
    {
      "name":"length_high",
      "type":"int",
      "doc":"Length upper bounds in meters"
    }
  ]
}
EOF
 
Now still within the schema-registry container produce some dinosaurs specifying the url of the Schema Registry (localhost).
kafka-avro-console-producer --topic dinosaurs --broker-list broker:29092 \
  --property schema.registry.url="http://localhost:8081" \
  --property value.schema="$(< dinos.avsc)"
>{"name":"Ankylosaurus", "weight_low": 5, "weight_high": 8, "length_low": 6, "length_high": 8}
>{"name":"Carnotaurus", "weight_low": 1, "weight_high": 2, "length_low": 7, "length_high": 8}
>{"name":"Tyrannosaurus", "weight_low": 8, "weight_high": 14, "length_low": 10, "length_high": 12}
>{"name":"Triceratops", "weight_low": 10, "weight_high": 12, "length_low": 7, "length_high": 9}
CTRL+C to exit producer
 
Then run the following to consume the avro based messages.
kafka-avro-console-consumer --bootstrap-server broker:29092 --topic dinosaurs --from-beginning \
  --property schema.registry.url="http://localhost:8081"

Output.

{"name":"Ankylosaurus","weight_low":5,"weight_high":8,"length_low":6,"length_high":8}
{"name":"Carnotaurus","weight_low":1,"weight_high":2,"length_low":7,"length_high":8}
{"name":"Tyrannosaurus","weight_low":8,"weight_high":14,"length_low":10,"length_high":12}
{"name":"Triceratops","weight_low":10,"weight_high":12,"length_low":7,"length_high":9}

Conclusion

In this short article I provided working examples of a number of different Kafka CLI commands I've found useful during my time working with Apache Kafka.

Share with friends and colleagues

[[ likes ]] likes

Community favorites for DevOps

theCodingInterface