---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.1.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:6.1.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
docker-compose up
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
docker exec -it broker /bin/bash
docker exec -it schema-registry /bin/bash
kafka-topics --bootstrap-server localhost:9092 --create --topic pets --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
_schemas
people
pets
kafka-topic --bootstrap-server localhost:9092 --describe --topic people
Topic: people PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: people Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: people Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: people Partition: 2 Leader: 1 Replicas: 1 Isr: 1
kafka-topics --bootstrap-server localhost:9092 --create --topic people.promotions --partitions 1 --replication-factor 1 \
--config retention.ms=360000 --config cleanup.policy=compact
kafka-configs --bootstrap-server localhost:9092 --describe --all --topic people.promotions
Output.
All configs for topic people.promotions are:
compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer}
leader.replication.throttled.replicas= sensitive=false synonyms={}
message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true}
min.insync.replicas=2 sensitive=false synonyms={STATIC_BROKER_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
segment.jitter.ms=0 sensitive=false synonyms={}
cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
flush.ms=9223372036854775807 sensitive=false synonyms={}
follower.replication.throttled.replicas= sensitive=false synonyms={}
segment.bytes=1073741824 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.bytes=1073741824}
retention.ms=360000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=360000}
flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
message.format.version=2.7-IV2 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=2.7-IV2}
max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807}
file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000}
max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588}
min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime}
preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false}
min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096}
unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1}
delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
segment.ms=604800000 sensitive=false synonyms={}
message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807}
segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}
kafka-console-producer --bootstrap-server localhost:9092 --topic people
>{"name":"Stewie", "show": "Family Guy"}
>{"name":"Popeye", "show": "Popeye"}
>{"name":"Bart", "show": "The Simpsons"}
>{"name":"Fred", "show": "The Flintstones"}
>{"name":"Shaggy", "show": "Scooby-Doo"}
kafka-console-producer --bootstrap-server localhost:9092 --topic pets --property "parse.key=true" --property "key.separator=|"
>baxter|{"name": "Baxter", "movie": "Anchorman"}
>marley|{"name": "Marley", "movie": "Marley and Me"}
>toothless|{"name": "Toothless", "movie": "How To Train Your Dragon"}
>willy|{"name": "Willy", "movie": "Free Willy"}
>babe|{"name": "Babe", "movie": "Babe"}
>hooch|{"name": "Hooch", "movie": "Turner & Hooch"}
kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginning
Output.
{"name":"Stewie", "show": "Family Guy"}
{"name":"Fred", "show": "The Flintstones"}
{"name":"Bart", "show": "The Simpsons"}
{"name":"Shaggy", "show": "Scooby-Doo"}
{"name":"Popeye", "show": "Popeye"}
^CProcessed a total of 5 messages
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning --property print.key=true
Output.
baxter {"name": "Baxter", "movie": "Anchorman"}
marley {"name": "Marley", "movie": "Marley and Me"}
toothless {"name": "Toothless", "movie": "How To Train Your Dragon"}
willy {"name": "Willy", "movie": "Free Willy"}
babe {"name": "Babe", "movie": "Babe"}
hooch {"name": "Hooch", "movie": "Turner & Hooch"}
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets --from-beginning \
--group adam-pet-consumer --property print.offset=true
Output.
Offset:0 {"name": "Baxter", "movie": "Anchorman"}
Offset:1 {"name": "Marley", "movie": "Marley and Me"}
Offset:2 {"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3 {"name": "Willy", "movie": "Free Willy"}
Offset:4 {"name": "Babe", "movie": "Babe"}
Offset:5 {"name": "Hooch", "movie": "Turner & Hooch"}
kafka-console-consumer --bootstrap-server localhost:9092 --topic people --from-beginning \
--group adam-people-consumer --property print.timestamp=true
Output.
CreateTime:1645938589986 {"name":"Stewie", "show": "Family Guy"}
CreateTime:1645938688396 {"name":"Fred", "show": "The Flintstones"}
CreateTime:1645938656642 {"name":"Bart", "show": "The Simpsons"}
CreateTime:1645938735597 {"name":"Shaggy", "show": "Scooby-Doo"}
CreateTime:1645938624686 {"name":"Popeye", "show": "Popeye"}
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
Output.
Consumer group 'adam-people-consumer' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
adam-people-consumer people 0 2 2 0 - - -
adam-people-consumer people 2 2 2 0 - - -
adam-people-consumer people 1 1 1 0 - - -
Consumer group 'adam-pet-consumer' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
adam-pet-consumer pets 0 6 6 0 - - -
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group adam-people-consumer
Output.
Consumer group 'adam-people-consumer' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
adam-people-consumer people 0 2 2 0 - - -
adam-people-consumer people 2 2 2 0 - - -
adam-people-consumer people 1 1 1 0 - - -
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
--reset-offsets --to-earliest --topic people --dry-run
Output.
GROUP TOPIC PARTITION NEW-OFFSET
adam-pet-consumer people 0 0
adam-pet-consumer people 1 0
adam-pet-consumer people 2 0
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-pet-consumer \
--reset-offsets --to-earliest --topic people --execute
kafka-consumer-groups --bootstrap-server localhost:9092 --group adam-people-consumer \
--reset-offsets --to-offset 1 --topic people:0 --execute
Output.
GROUP TOPIC PARTITION NEW-OFFSET
adam-people-consumer people 0 1
Yet another way to view the consumer offsets is to fetch the contents of __consumer_offsets
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list people
Output.
Querying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"people-0","size":217,"offsetLag":0,"isFuture":false},{"partition":"people-2","size":214,"offsetLag":0,"isFuture":false},{"partition":"people-1","size":103,"offsetLag":0,"isFuture":false}]}]}]}
kafka-log-dirs --bootstrap-server localhost:9092 --describe \
--topic-list people | grep -oP '(?<=size":)\d+' | awk '{ sum += $1 } END { print sum }'
kafka-console-consumer --bootstrap-server localhost:9092 --topic pets \
--offset 2 --max-messages 3 --partition 0 --property print.offset=true
Output.
Offset:2 {"name": "Toothless", "movie": "How To Train Your Dragon"}
Offset:3 {"name": "Willy", "movie": "Free Willy"}
Offset:4 {"name": "Babe", "movie": "Babe"}
Processed a total of 3 messages
kafka-topics --bootstrap-server localhost:9092 --delete --topic people
kafka-topics --bootstrap-server localhost:9092 --create --topic dinosaurs --replication-factor 1 --partitions 1
cat <<EOF >./dinos.avsc
{
"type":"record",
"namespace":"dinos",
"name":"Dinosaur",
"fields": [
{
"name":"name",
"type":"string",
"doc":"Name of Dinosaur"
},
{
"name":"weight_low",
"type":"int",
"doc":"Weight lower bounds in metric tons"
},
{
"name":"weight_high",
"type":"int",
"doc":"Weight upper bounds in metric tons"
},
{
"name":"length_low",
"type":"int",
"doc":"Length lower bounds in meters"
},
{
"name":"length_high",
"type":"int",
"doc":"Length upper bounds in meters"
}
]
}
EOF
kafka-avro-console-producer --topic dinosaurs --broker-list broker:29092 \
--property schema.registry.url="http://localhost:8081" \
--property value.schema="$(< dinos.avsc)"
>{"name":"Ankylosaurus", "weight_low": 5, "weight_high": 8, "length_low": 6, "length_high": 8}
>{"name":"Carnotaurus", "weight_low": 1, "weight_high": 2, "length_low": 7, "length_high": 8}
>{"name":"Tyrannosaurus", "weight_low": 8, "weight_high": 14, "length_low": 10, "length_high": 12}
>{"name":"Triceratops", "weight_low": 10, "weight_high": 12, "length_low": 7, "length_high": 9}
kafka-avro-console-consumer --bootstrap-server broker:29092 --topic dinosaurs --from-beginning \
--property schema.registry.url="http://localhost:8081"
Output.
{"name":"Ankylosaurus","weight_low":5,"weight_high":8,"length_low":6,"length_high":8}
{"name":"Carnotaurus","weight_low":1,"weight_high":2,"length_low":7,"length_high":8}
{"name":"Tyrannosaurus","weight_low":8,"weight_high":14,"length_low":10,"length_high":12}
{"name":"Triceratops","weight_low":10,"weight_high":12,"length_low":7,"length_high":9}
In this short article I provided working examples of a number of different Kafka CLI commands I've found useful during my time working with Apache Kafka.