Handy tool for quick producing/consuming Kafka messages and more.
1. What is kafkacat ?
Citing official README:
kafkacat
is a generic non-JVM producer and consumer for Apache Kafka >=0.8, think of it as a netcat for Kafka.
Simply put — it’s very handy tool to work with Kafka.
To understand what exactly the tools does, one should try it out. So, below I give simple walkthrough, that demonstrates how to set-up cluster with 1 broker and produce/consume messages with kafkacat
. Without further ado — let’s start.
2. Setting-up local Kafka cluster with one broker
Let’s quickly bootstrap local Kafka cluster:
version: "3"
services:
kafka-broker:
image: confluentinc/cp-kafka:latest
container_name: kafka-broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: 10000000
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
Note: KAFKA_AUTO_CREATE_TOPICS_ENABLE
is set to "true"
deliberately, so we can experiment with producing /consuming without setting up topics beforehand.
3. Using kafkacat
kafkacat
docker image is available on Docker Hub. So, if you have docker installed — you can spend no time on installation, and just start using it.
3.1. Metadata mode
List brokers and topics in cluster
This will print cluster metadata:
$ docker run --tty --rm --interactive \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -b localhost:9092 -L
-b
— broker host:port
-L
— metadata mode (will list brokers and topics in the cluster)
The above command outputs:
Metadata for all topics (from broker 1: localhost:9092/1):
1 brokers:
broker 1 at localhost:9092
1 topics:
topic "__confluent.support.metrics" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
3.2. Producer mode
Producing messages from a file
Let’s create file with messages:
$ cat >> /tmp/orders.txt <<EOF
> 1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
> 2:{"order_id":2,"order_ts":1534772605276,"total_amount":3.32,"customer_name":"Sarah Black"}
> 3:{"order_id":3,"order_ts":1534772742276,"total_amount":21.00,"customer_name":"Emma Turner"}
> EOF
Next, lets send messages from file to orders
topic:
$ docker run --tty --rm --interactive \
--network=host \
--volume /tmp/orders.txt:/data/orders.txt \
confluentinc/cp-kafkacat \
kafkacat -b locahost:9092 -t orders -P -l /data/orders.txt
-b
— broker host:port
-t
— topic to produce to-P
— produce mode-l
— send messages from a file (only one file allowed)
Producing messages inline
This will send three messages, with given key:value
:
$ docker run --interactive --rm \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -b localhost:9092 -t orders -K: -P <<EOF
4:{"order_id":4,"order_ts":1534772801276,"total_amount":11.50,"customer_name":"Alina Smith"}
5:{"order_id":5,"order_ts":1534772905276,"total_amount":13.32,"customer_name":"Alex Black"}
6:{"order_id":6,"order_ts":1534773042276,"total_amount":31.00,"customer_name":"Emma Watson"}
EOF
-b
— broker host:port
-t
— topic to consume from-K:
— print message keys prefixing the message with :
-P
— produce mode<<EOF … EOF
— a here document, that redirects messages to be produced
3.3. Consumer mode
Consuming messages from a topic
$ docker run --tty --rm --interactive \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -C -b localhost:9092 -K: \
-f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
-t orders -c 1
-C
— consume mode-b
— broker host:port
-K:
— print message keys prefixing the message with :
-f
— output formatting string-t
— topic to consume from-c
— exit after producing 1 message
The above command will consume all messages from orders
topic:
Key (-1 bytes):
Value (90 bytes): 1:{"order_id":1,"order_ts":1534772501276,"total_amount":10.50,"customer_name":"Bob Smith"}
Partition: 0 Offset: 0
3.4. Query mode
Query mode allows to query offset by timestamp in the following format:
kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>
Consuming offset from a topic
$ docker run --tty --rm --interactive \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -Q -b localhost:9092 -t orders:0:-1
-Q
— query mode-b
— broker host:port
-t
— topic to consume from
The above command will output:
orders [0] offset 6
Now that we have the offset, let’s query all messages after specified offset:
$ docker run --tty --rm --interactive \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -q -b localhost:9092 -t orders -p 0 -o 5
-q
— be quite (verbosity set to 0)-p
— partition-o
— offset to start consuming from
4. Conclusion
That’s it for now. Hopefully, you learnt something interesting or useful ;)