In recent times, Spring Boot has become the de facto framework for creating microservices. One of the patterns used in microservice creation is choreography, where Kafka, using the event sourcing pattern, can be of great help. In this post, we will discuss how to configure Kafka with Spring Boot to enable message sending.
What is Kafka?
Kafka is an open-source stream processing platform developed by Apache. This platform, written in Java and Scala, aims to provide low-latency processing for real-time data. It uses the TCP protocol and groups messages to reduce network round-trip overhead, making Kafka one of the most widely used platforms for message sending and receiving.
Basic Concepts about kafka
We won’t delve too much into Kafka, but we’ll briefly mention two basic concepts:
- Topics: In Kafka, we always send a message to a topic and subscribe to a topic for receiving.
- Consumer Groups: Subscribers always form groups, and Kafka ensures that a message is only sent to a single subscriber within a group.
We’ll cover the configuration of these concepts in the following sections.
Configuring Kafka with Spring Boot
Kafka Dependencies
The easiest way to integrate Kafka into a Spring Boot application is to start with the Spring Initializr project and specify the dependencies to use.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
This is the main dependency for the project.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
This is the dependency for testing.
Starting a Kafka Broker with Docker
To continue, let’s start a Kafka broker using Docker:
$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest
This Docker image includes an administration component, which allows, for example, injecting messages using Postman.
Configuring Kafka in Our Application
In this tutorial, we’ll configure a producer and a consumer.
Kafka Configuration in Spring Boot without SSL
Producer Configuration
Thanks to the magic of Spring Boot’s auto-configuration, by adding the Maven dependency and some properties in the .yaml
or .properties
file, our producer will be ready to work:
kafka.topic.name= myTopic spring.kafka.bootstrap-servers=kafkaserver:9092 spring.kafka.consumer.group-id=myGroup
We have defined the message topic, the connection to the broker (if there are multiple brokers, they are separated by commas), and the group to which the listeners are associated.
By adding the Kafka dependency, we have created a ProducerFactory
, which sets the message delivery strategy.
Keep in mind that Kafka has many more properties, but these are the basic and default ones.
Sending a Message from the Producer
Once we have done the basic configuration, we are ready to send our first message.
@Component @RequiredArgsConstructor @Slf4j public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; @Value(value = "${kafka.topic.name}") private String topic; public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message); future.addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(SendResult<String, String> result) { log.debug("Message {} has been sent ", message); } @Override public void onFailure(Throwable ex) { log.error("Something went wrong with the message {} ", message); } }); } }
The most important part of this class is the KafkaTemplate class, which is a wrapper around the Producer and provides us with methods to work with it. KafkaTemplate is thread-safe, and its use is recommended for message sending.
As we can see, the message sending API returns a ListenableFuture object, which allows us to block the sending thread and obtain the result of the message sending. However, this can slow down the producer.
The idea of Kafka is to process streams as quickly as possible, so it is better to send messages asynchronously through a callback, as we have done above. This callback will return an onFailure if something went wrong and an onSuccess if everything was successful.
Consuming messages
To consume messages, we need the ConsumerFactory class and the KafkaListenerContainerFactory class, both of which are configured when adding the Spring dependency. This allows us to use the @KafkaListener annotation to receive messages on the specified topic.
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "${kafka.topic.name}") public void listener(String message) { log.debug("Message received {} ", message); //Do something } }
In this class, as we mentioned before, we used the @KafkaListener annotation with a topic for receiving messages from that topic. Since we haven’t defined any group-id, it will take the one configured in spring.kafka.consumer.group-id by default.
Once we have created the consumer, all that’s left is to test our application.
Configuring Kafka in Spring Boot with SSL
In the previous section, we saw how to configure our Kafka broker without SSL. However, if we want to establish security mechanisms, we can add a certificate and SSL configuration in our Kafka configuration. Here, we will connect to a secure broker on port 9094 and add a truststore for the connection.
kafka.topic.name: test-strimzi-topic spring: kafka: security: protocol: "SSL" bootstrap-servers: 127.0.0.1:9094 ssl: trust-store-location: classpath:/kafka.jks trust-store-password: changeit consumer: group-id: demo-group-id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Creating a controller to test the application.
To test the application we have created, let’s add a controller that handles sending messages via REST.
@RestController @RequiredArgsConstructor public class KafkaController { private final KafkaProducer kafkaProducer; @PostMapping("/messages/send") public ResponseEntity<String> sendMessage(@RequestBody String message) { kafkaProducer.sendMessage(message); return ResponseEntity.ok(message); } }
Start the application, execute a POST request from Postman to http://localhost:8080/messages/send with the body “hello”, and you should see the following outputs in the console:
Result
Message "hello" has been sent.
Message received: "hello"
Conclusion
In this entry of Kafka in Spring Boot with example, we have seen the basic configuration for message production and consumption with Kafka. In the following articles, we will explore different and more advanced configurations of Kafka.
You can find the example on our GitHub.
Other articles you may be interested in:
Orchestration vs choreography in microservices