AdBlock Detected

It looks like you're using an ad-blocker!

Our team work realy hard to produce quality content on this website and we noticed you have ad-blocking enabled. Advertisements and advertising enable us to continue working and provide high-quality content.

kafka in Spring Boot

In recent times, Spring Boot has become the de facto framework for creating microservices. One of the patterns used in microservice creation is choreography, where Kafka, using the event sourcing pattern, can be of great help. In this post, we will discuss how to configure Kafka with Spring Boot to enable message sending.

What is Kafka?

Kafka in Spring Boot with example
Kafka with Spring Boot

Kafka is an open-source stream processing platform developed by Apache. This platform, written in Java and Scala, aims to provide low-latency processing for real-time data. It uses the TCP protocol and groups messages to reduce network round-trip overhead, making Kafka one of the most widely used platforms for message sending and receiving.

Basic Concepts about kafka

We won’t delve too much into Kafka, but we’ll briefly mention two basic concepts:

  • Topics: In Kafka, we always send a message to a topic and subscribe to a topic for receiving.
  • Consumer Groups: Subscribers always form groups, and Kafka ensures that a message is only sent to a single subscriber within a group.

We’ll cover the configuration of these concepts in the following sections.

Configuring Kafka with Spring Boot

Kafka Dependencies

The easiest way to integrate Kafka into a Spring Boot application is to start with the Spring Initializr project and specify the dependencies to use.

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

This is the main dependency for the project.

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

This is the dependency for testing.

Starting a Kafka Broker with Docker

To continue, let’s start a Kafka broker using Docker:

$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest

This Docker image includes an administration component, which allows, for example, injecting messages using Postman.

Configuring Kafka in Our Application

In this tutorial, we’ll configure a producer and a consumer.

Kafka Configuration in Spring Boot without SSL

Producer Configuration

Thanks to the magic of Spring Boot’s auto-configuration, by adding the Maven dependency and some properties in the .yaml or .properties file, our producer will be ready to work:

kafka.topic.name= myTopic
spring.kafka.bootstrap-servers=kafkaserver:9092
spring.kafka.consumer.group-id=myGroup

We have defined the message topic, the connection to the broker (if there are multiple brokers, they are separated by commas), and the group to which the listeners are associated.

By adding the Kafka dependency, we have created a ProducerFactory, which sets the message delivery strategy.

Keep in mind that Kafka has many more properties, but these are the basic and default ones.

Sending a Message from the Producer

Once we have done the basic configuration, we are ready to send our first message.

@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

  private final KafkaTemplate<String, String> kafkaTemplate;

  @Value(value = "${kafka.topic.name}")
  private String topic;

  public void sendMessage(String message) {

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message);
    future.addCallback(new ListenableFutureCallback<>() {
      @Override
      public void onSuccess(SendResult<String, String> result) {
        log.debug("Message {} has been sent ", message);
      }
      @Override
      public void onFailure(Throwable ex) {
        log.error("Something went wrong with the message {} ", message);
      }
    });
  }
}

The most important part of this class is the KafkaTemplate class, which is a wrapper around the Producer and provides us with methods to work with it. KafkaTemplate is thread-safe, and its use is recommended for message sending.

As we can see, the message sending API returns a ListenableFuture object, which allows us to block the sending thread and obtain the result of the message sending. However, this can slow down the producer.

The idea of Kafka is to process streams as quickly as possible, so it is better to send messages asynchronously through a callback, as we have done above. This callback will return an onFailure if something went wrong and an onSuccess if everything was successful.

Consuming messages

To consume messages, we need the ConsumerFactory class and the KafkaListenerContainerFactory class, both of which are configured when adding the Spring dependency. This allows us to use the @KafkaListener annotation to receive messages on the specified topic.

@Component
@Slf4j
public class KafkaConsumer {


  @KafkaListener(topics = "${kafka.topic.name}")
  public void listener(String message) {
    log.debug("Message received {} ", message);

    //Do something
  }


}

In this class, as we mentioned before, we used the @KafkaListener annotation with a topic for receiving messages from that topic. Since we haven’t defined any group-id, it will take the one configured in spring.kafka.consumer.group-id by default.

Once we have created the consumer, all that’s left is to test our application.

Configuring Kafka in Spring Boot with SSL

In the previous section, we saw how to configure our Kafka broker without SSL. However, if we want to establish security mechanisms, we can add a certificate and SSL configuration in our Kafka configuration. Here, we will connect to a secure broker on port 9094 and add a truststore for the connection.

kafka.topic.name: test-strimzi-topic


spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: 127.0.0.1:9094
    ssl:
      trust-store-location: classpath:/kafka.jks
      trust-store-password: changeit
    consumer:
      group-id: demo-group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Creating a controller to test the application.

To test the application we have created, let’s add a controller that handles sending messages via REST.

@RestController
@RequiredArgsConstructor
public class KafkaController {

  private final KafkaProducer kafkaProducer;


  @PostMapping("/messages/send")
  public ResponseEntity<String> sendMessage(@RequestBody String message) {

    kafkaProducer.sendMessage(message);

    return ResponseEntity.ok(message);
  }

}

Start the application, execute a POST request from Postman to http://localhost:8080/messages/send with the body “hello”, and you should see the following outputs in the console:

Result

Message "hello" has been sent.
Message received: "hello"

Conclusion

In this entry of Kafka in Spring Boot with example, we have seen the basic configuration for message production and consumption with Kafka. In the following articles, we will explore different and more advanced configurations of Kafka.

You can find the example on our GitHub.

If you need more information, you can leave us a comment or send an email to refactorizando.web@gmail.com You can also contact us through our social media channels on Facebook or twitter and we will be happy to assist you!!

Other articles you may be interested in:

Orchestration vs choreography in microservices

Kafka in microservices

Leave a Reply

Your email address will not be published. Required fields are marked *