---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-kafka:6.1.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  schema-registry:
    image: confluentinc/cp-schema-registry:6.1.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081docker-compose up$ docker-compose ps
     Name                  Command            State                        Ports
------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run   Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
schema-registry   /etc/confluent/docker/run   Up      0.0.0.0:8081->8081/tcp
zookeeper         /etc/confluent/docker/run   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcpdocker exec -it broker /bin/bashdocker exec -it schema-registry /bin/bashkafka-topics --bootstrap-server localhost:9092 --create --topic pets --replication-factor 1 --partitions 1kafka-topics --bootstrap-server localhost:9092 --list__consumer_offsets
_schemas
people
pets
kafka-topic --bootstrap-server localhost:9092 --describe --topic peopleTopic: people	PartitionCount: 3	ReplicationFactor: 1	Configs:
	Topic: people	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: people	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: people	Partition: 2	Leader: 1	Replicas: 1	Isr: 1kafka-topics --bootstrap-server localhost:9092 --create --topic people.promotions --partitions 1 --replication-factor 1 \
  --config retention.ms=360000 --config cleanup.policy=compactkafka-configs --bootstrap-server localhost:9092 --describe --all --topic people.promotionsOutput.
All configs for topic people.promotions are:
  compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer}
  leader.replication.throttled.replicas= sensitive=false synonyms={}
  message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true}
  min.insync.replicas=2 sensitive=false synonyms={STATIC_BROKER_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
  segment.jitter.ms=0 sensitive=false synonyms={}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  flush.ms=9223372036854775807 sensitive=false synonyms={}
  follower.replication.throttled.replicas= sensitive=false synonyms={}
  segment.bytes=1073741824 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=360000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=360000}
  flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
  message.format.version=2.7-IV2 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=2.7-IV2}
  max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807}
  file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000}
  max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588}
  min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
  message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime}
  preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false}
  min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
  index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096}
  unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
  retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1}
  delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
  segment.ms=604800000 sensitive=false synonyms={}
  message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807}
  segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}kafka-console-producer --bootstrap-server localhost:9092 --topic people
>{"name":"Stewie", "show": "Family Guy"}
>{"name":"Popeye", "show": "Popeye"}
>{"name":"Bart", "show": "The Simpsons"}
>{"name":"Fred", "show": "The Flintstones"}
>{"name":"Shaggy", "show": "Scooby-Doo"}kafka-console-producer --bootstrap-server localhost:9092 --topic pets --property "parse.key=true" --property "key.separator=|"
>baxter|{"name": "Baxter", "movie": "Anchorman"}
>marley|{"name": "Marley", "movie": "Marley and Me"}
>toothless|{"name": "Toothless", "movie": "How To Train Your Dragon"}
>willy|{"name": "Willy", "movie": "Free Willy"}
>babe|{"name": "Babe", "movie": "Babe"}
>hooch|{"name": "Hooch", "movie": "Turner & Hooch"}kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginningOutput.
{"name":"Stewie", "show": "Family Guy"}
{"name":"Fred", "show": "The Flintstones"}
{"name":"Bart", "show": "The Simpsons"}
{"name":"Shaggy", "show": "Scooby-Doo"}
{"name":"Popeye", "show": "Popeye"}
^CProcessed a total of 5 messageskafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning --property print.key=trueOutput.
baxter	{"name": "Baxter", "movie": "Anchorman"}
marley	{"name": "Marley", "movie": "Marley and Me"}
toothless	{"name": "Toothless", "movie": "How To Train Your Dragon"}
willy	{"name": "Willy", "movie": "Free Willy"}
babe	{"name": "Babe", "movie": "Babe"}
hooch	{"name": "Hooch", "movie": "Turner & Hooch"}
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning \
    --group adam-pet-consumer --property print.offset=trueOutput.
Offset:0	{"name": "Baxter", "movie": "Anchorman"}
Offset:1	{"name": "Marley", "movie": "Marley and Me"}
Offset:2	{"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3	{"name": "Willy", "movie": "Free Willy"}
Offset:4	{"name": "Babe", "movie": "Babe"}
Offset:5	{"name": "Hooch", "movie": "Turner & Hooch"}kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginning \
    --group adam-people-consumer --property print.timestamp=trueOutput.
CreateTime:1645938589986	{"name":"Stewie", "show": "Family Guy"}
CreateTime:1645938688396	{"name":"Fred", "show": "The Flintstones"}
CreateTime:1645938656642	{"name":"Bart", "show": "The Simpsons"}
CreateTime:1645938735597	{"name":"Shaggy", "show": "Scooby-Doo"}
CreateTime:1645938624686	{"name":"Popeye", "show": "Popeye"}
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groupsOutput.
Consumer group 'adam-people-consumer' has no active members.
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-people-consumer people          0          2               2               0               -               -               -
adam-people-consumer people          2          2               2               0               -               -               -
adam-people-consumer people          1          1               1               0               -               -               -
Consumer group 'adam-pet-consumer' has no active members.
GROUP             TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-pet-consumer pets            0          6               6               0               -               -               -
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group adam-people-consumerOutput.
Consumer group 'adam-people-consumer' has no active members.
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
adam-people-consumer people          0          2               2               0               -               -               -
adam-people-consumer people          2          2               2               0               -               -               -
adam-people-consumer people          1          1               1               0               -               -               -kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
    --reset-offsets --to-earliest --topic people --dry-runOutput.
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
adam-pet-consumer              people                         0          0
adam-pet-consumer              people                         1          0
adam-pet-consumer              people                         2          0kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
    --reset-offsets --to-earliest --topic people --executekafka-consumer-groups --bootstrap-server localhost:9092 --group adam-people-consumer \
    --reset-offsets --to-offset 1 --topic people:0 --executeOutput.
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
adam-people-consumer           people                         0          1Yet another way to view the consumer offsets is to fetch the contents of __consumer_offsets
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list peopleOutput.
Querying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"people-0","size":217,"offsetLag":0,"isFuture":false},{"partition":"people-2","size":214,"offsetLag":0,"isFuture":false},{"partition":"people-1","size":103,"offsetLag":0,"isFuture":false}]}]}]}
kafka-log-dirs --bootstrap-server localhost:9092 --describe \
    --topic-list people | grep -oP '(?<=size":)\d+' | awk '{ sum += $1 } END { print sum }'kafka-console-consumer --bootstrap-server localhost:9092 --topic pets \
    --offset 2 --max-messages 3 --partition 0 --property print.offset=trueOutput.
Offset:2	{"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3	{"name": "Willy", "movie": "Free Willy"}
Offset:4	{"name": "Babe", "movie": "Babe"}
Processed a total of 3 messageskafka-topics --bootstrap-server localhost:9092 --delete --topic peoplekafka-topics --bootstrap-server localhost:9092 --create --topic dinosaurs --replication-factor 1 --partitions 1cat <<EOF >./dinos.avsc
{
  "type":"record",
  "namespace":"dinos",
  "name":"Dinosaur",
  "fields": [
    {
      "name":"name",
      "type":"string",
      "doc":"Name of Dinosaur"
    },
    {
      "name":"weight_low",
      "type":"int",
      "doc":"Weight lower bounds in metric tons"
    },
    {
      "name":"weight_high",
      "type":"int",
      "doc":"Weight upper bounds in metric tons"
    },
    {
      "name":"length_low",
      "type":"int",
      "doc":"Length lower bounds in meters"
    },
    {
      "name":"length_high",
      "type":"int",
      "doc":"Length upper bounds in meters"
    }
  ]
}
EOFkafka-avro-console-producer --topic dinosaurs --broker-list broker:29092 \
  --property schema.registry.url="http://localhost:8081" \
  --property value.schema="$(< dinos.avsc)"
>{"name":"Ankylosaurus", "weight_low": 5, "weight_high": 8, "length_low": 6, "length_high": 8}
>{"name":"Carnotaurus", "weight_low": 1, "weight_high": 2, "length_low": 7, "length_high": 8}
>{"name":"Tyrannosaurus", "weight_low": 8, "weight_high": 14, "length_low": 10, "length_high": 12}
>{"name":"Triceratops", "weight_low": 10, "weight_high": 12, "length_low": 7, "length_high": 9}kafka-avro-console-consumer --bootstrap-server broker:29092 --topic dinosaurs --from-beginning \
  --property schema.registry.url="http://localhost:8081"Output.
{"name":"Ankylosaurus","weight_low":5,"weight_high":8,"length_low":6,"length_high":8}
{"name":"Carnotaurus","weight_low":1,"weight_high":2,"length_low":7,"length_high":8}
{"name":"Tyrannosaurus","weight_low":8,"weight_high":14,"length_low":10,"length_high":12}
{"name":"Triceratops","weight_low":10,"weight_high":12,"length_low":7,"length_high":9}In this short article I provided working examples of a number of different Kafka CLI commands I've found useful during my time working with Apache Kafka.