Kafka Provision Spring Boot Starter enables distributed Kafka topics provisioning and centralized topic configs management.
Overview
Kafka Provision Spring Boot Starter supports following set of features:
creating new topics
adding partitions to the existing topics
setting/updating topic configurations
In this post I will describe the process of creating application, that uses the starter.
Demo application description
To not overshadow main goal, the demo application will be really simple. We will create 2 "microservices":
The first service will produce tasks and push them to the
tasks
topicThe second service will pull the tasks from the topic, sleep randomly and send events to
results
topic
So, everything sounds really simple - let’s overengineer it as much as possible!
Setting up basic Spring Boot services
First, let’s bootstrap Spring Boot app with Spring Cloud Stream, Spring Kafka and Lombok support:
$ curl https://start.spring.io/starter.zip \
-d dependencies=cloud-stream,kafka,lombok \
-d type=gradle-project \
-d baseDir=task-producer \
-d groupId=com.oxymorus.kafka.producer \
-d artifactId=task-producer \
-o task-producer.zip
$ unzip task-producer.zip && rm task-producer.zip
Just a quick note: Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
The core building blocks of Spring Cloud Stream are:
Destination Binders: Components responsible to provide integration with the external messaging systems.
Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).
Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).
To get more detail just read the official reference here.
Ok, let’s get back to our main course and bootstrap task-consumer service:
$ curl https://start.spring.io/starter.zip -d dependencies=cloud-stream,kafka,lombok \
-d type=gradle-project \
-d baseDir=task-consumer \
-d groupId=com.oxymorus.kafka.consumer \
-d artifactId=task-consumer \
-o task-consumer.zip
$ unzip task-consumer.zip && rm task-consumer.zip
Configuring Kafka topics
As described earlier two created services will communicate over Kafka topics: tasks
and results
. So, we need to create & configure these topics. Here, comes the time for Kafka Provision Spring Boot Starter.
We will do this in three steps:
Add dependency
Add @EnableTopicProvisioning
Configure topics
Let’s do this procedure step-by-step for task-producer service. First, let’s add dependency to build.gradle:
dependencies {
implementation 'io.github.zghurskyi.kafka:kafka-provision-spring-boot-starter:0.0.1'
}
Next, let’s add @EnableTopicProvisioning to TaskProducerApp class:
package com.oxymorus.kafka.producer;
import io.github.zghurskyi.kafka.EnableTopicProvisioning;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableTopicProvisioning
public class TaskProducerApp {
public static void main(String[] args) {
SpringApplication.run(TaskProducerApp.class, args);
}
}
And finally, let’s configure required topics:
kafka.provision:
brokers: localhost:9092
topics:
- name: tasks
numPartitions: 4
replicationFactor: 1
configs:
cleanup.policy: delete
retention.ms: 3600000
- name: results
numPartitions: 4
replicationFactor: 1
configs:
cleanup.policy: delete
retention.ms: 3600000
The above steps are similar for task-consumer
service.
The details of setting up Spring Cloud Stream & Kafka in Spring Boot app deserve separate blog post, so to just stay on point I will skip them. You can find completed demo app here.
Setting up Kafka infrastructure
For the purposes of this demo we set up infrastructure with following docker-compose.yml:
version: '3'
services:
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: 10000000
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
Putting everything together
The time has come to start everything up:
Boot up Kafka with docker-compose.yml:
$ cd kafka-provision-examples/
$ docker-compose up
Build and start
task-producer
:
$ ./task-producer/gradlew -b ./task-producer/build.gradle clean build
$ java -jar task-producer/build/libs/task-producer-0.0.1-SNAPSHOT.jar
Build and start
task-consumer
:
$ ./task-consumer/gradlew -b ./task-consumer/build.gradle clean build
$ java -jar task-consumer/build/libs/task-consumer-0.0.1-SNAPSHOT.jar
After starting everything up, we will see something like this in the logs:
task_producer | 2019-04-21 10:27:49.071 INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener : Received: ResultMessage(action=EAT, status=SUCCESS)
task_producer | 2019-04-21 10:27:49.191 INFO 1 --- [ scheduling-1] c.o.kafka.bindings.TasksPublisher : Published: TaskMessage(task=Task(action=SLEEP))
task_producer | 2019-04-21 10:27:49.413 INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener : Received: ResultMessage(action=WRITE_CODE, status=SUCCESS)
task_producer | 2019-04-21 10:27:50.191 INFO 1 --- [ scheduling-1] c.o.kafka.bindings.TasksPublisher : Published: TaskMessage(task=Task(action=SLEEP))
task_producer | 2019-04-21 10:27:50.826 INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener : Received: ResultMessage(action=EAT, status=FAIL)
task_producer | 2019-04-21 10:27:52.945 INFO 1 --- [container-0-C-1] c.o.kafka.bindings.ResultsListener : Received: ResultMessage(action=SLEEP, status=SKIP_THIS_TIME)
task_producer | 2019-04-21 10:27:53.191 INFO 1 --- [ scheduling-1] c.o.kafka.bindings.TasksPublisher : Received: ResultMessage(action=WRITE_CODE, status=SUCCESS)
This indicates, that everything works as it’s supposed to :)
Checking topics configuration
Now let’s checkout Kafka topic configs, that were provisioned by starter:
$ docker exec -ti kafka /bin/bash
root@4874c1726187:/# kafka-topics --zookeeper zookeeper:2181 --list
__confluent.support.metrics
__consumer_offsets
results
tasks
root@4874c1726187:/# kafka-topics --zookeeper zookeeper:2181 --describe --topic tasks
Topic:tasks PartitionCount:4 ReplicationFactor:1 Configs:retention.ms=360000,cleanup.policy=delete
Topic: tasks Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: tasks Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: tasks Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: tasks Partition: 3 Leader: 1 Replicas: 1 Isr: 1
root@4874c1726187:/# kafka-topics --zookeeper zookeeper:2181 --describe --topic results
Topic:results PartitionCount:4 ReplicationFactor:1 Configs:retention.ms=360000,cleanup.policy=delete
Topic: results Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: results Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: results Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: results Partition: 3 Leader: 1 Replicas: 1 Isr: 1
So, as we can see Kafka Provision Spring boot starter has created required topics for us and added specified configs.