Suppose, you have a task to implement dynamic routes configuration with Apache Camel. Or more concretely, you want to create Camel routes without having to add new route definitions just by changing configuration.
I recently encountered similar task, and as it turned out — there is a simple way to get things done . So, in this post I will share the solution.
1. Motivation
First, let’s define what initial state is and what we want to build.
Suppose we already have Spring Boot service, that uses Apache Camel for integration with other services and infrastructure. Specifically, the service produces messages to Apache Kafka.
As a software engineers, we know that in our dynamic world there is only one constant — requirements to our software will change. So, we want to build the system that is easy to maintain and extend. In this concrete instance we are going to build integration with Kafka in such a way, that producing messages to new topics will not require changing sources (defining new routes), but simply translates to adding new configurations.
2. Plan
Our approach will be the following:
Define routes configuration
Read configuration on service startup
Build Camel routes, based on defined configurations
Add routes to Camel context
Basically, that’s it. As I already let out earlier — nothing really complex is needed to solve our task.
3. Solution
For inpatient — here is the link to the source code. For readers, that want to get more details.
3.1. Define routes configuration
package com.oxymorus.configuration.properties;
import lombok.Data;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Map;
@Data
public class CamelKafkaProducerProperties {
@NotEmpty
private Map<String, @Valid ProducerProperties> producers;
@Data
public static class ProducerProperties {
@NotNull
private String brokers;
@NotNull
private String topic;
@NotNull
private String serializerClass = "org.apache.kafka.common.serialization.StringSerializer";
@NotNull
private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
public String getUri() {
return "kafka:start" +
"?brokers=" + brokers +
"&topic=" + topic +
"&serializerClass=" + serializerClass +
"&keySerializerClass=" + keySerializerClass;
}
}
}
Also, don’t forget to actually define several producer properties:
camel.kafka.producers.test0.brokers=localhost:9092
camel.kafka.producers.test0.topic=test1
camel.kafka.producers.test1.brokers=localhost:9092
camel.kafka.producers.test1.topic=test2
camel.kafka.producers.test2.brokers=localhost:9092
camel.kafka.producers.test2.topic=test3
3.2. Build configured routes
package com.oxymorus.listener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.CamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import static com.oxymorus.configuration.properties.CamelKafkaProducerProperties.ProducerProperties;
@Slf4j
@RequiredArgsConstructor
public class CamelKafkaProducerRoutesBuilder extends RouteBuilder {
private final String key;
private final ProducerProperties producerProperties;
public CamelKafkaProducerRoutesBuilder(CamelContext context, String key, ProducerProperties producerProperties) {
super(context);
this.key = key;
this.producerProperties = producerProperties;
}
@Override
public void configure() throws Exception {
from("direct:producer-" + key)
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}")
.to(producerProperties.getUri())
.log("Successfully sent ${body}");
}
}
3.3. Initialize routes on service startup
package com.oxymorus.listener;
import com.oxymorus.configuration.properties.CamelKafkaProducerProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.CamelContext;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class CamelKafkaRoutesInitializingListener {
private final CamelKafkaProducerProperties producerProperties;
private final CamelContext camelContext;
@EventListener(ApplicationReadyEvent.class)
public void initializeRoutes() {
try {
log.info("Building Kafka producer routes: " + producerProperties);
List<CamelKafkaProducerRoutesBuilder> routesBuilders = producerRoutesBuilder();
for (CamelKafkaProducerRoutesBuilder routesBuilder : routesBuilders) {
camelContext.addRoutes(routesBuilder);
}
} catch (Exception exception) {
log.error("Failed to build dynamic routes: " + producerProperties, exception);
}
}
private List<CamelKafkaProducerRoutesBuilder> producerRoutesBuilder() {
return producerProperties.getProducers().entrySet().stream()
.map(e -> new CamelKafkaProducerRoutesBuilder(camelContext, e.getKey(), e.getValue()))
.collect(Collectors.toList());
}
}
4. Smoke Testing
As good people, we should test our software. But today I’m lazy and will do only manual testing.
Start local kafka broker
$ cd docker
$ docker-compose up
Start console consumer
$ docker run --tty --rm --interactive \
--network=host \
confluentinc/cp-kafkacat \
kafkacat -C -b localhost:9092 -t test1
Start the service
$ mvn clean install
$ mvn spring-boot:run
Or alternatively:
$ java -jar target/investigation-camel-dynamic-routes-1.0-SNAPSHOT.jar
5. Conclusions
So, as turned out — it’s not a big deal to dynamically add routes to Apache Camel. In this post I described a way for Kafka producer routes, but nothing prevents you to use the same approach for any other Camel routes.