Kafka con Spring Boot Parte Uno


En los últimos tiempos Spring Boot se ha convertido en el framework de facto para la creación de micro servicios. Uno de los patrones utilizados a la hora de crear microservicios es la coreografía en donde Kafka mediante el patrón event sourcing nos puede ser de gran ayuda. En esta entrada vamos a tratar como configurar kafka con Spring Boot para poder realizar el envío de mensajes.

¿Qué es Kafka?

kafka con Spring Boot
kafka con Spring Boot

Kafka es una plataforma de código abierto de procesamiento de streams, desarrollada por Apache. Esta plataforma escrita en Java y Scala, tiene como objetivo principal, proporcionar baja latencia para poder procesar datos en tiempo real. El protocolo que utiliza es TCP, en la que agrupa los mensajes para reducir la sobrecarga de ida y vuelta a la red, lo que convierte a Kafka a día de hoy en un de las plataformas más utilizadas para el envío y recepción de mensajes.

Conceptos básicos

No vamos a profundizar mucho más en kafka, tan solo comentaremos dos conceptos básicos:

  • Topics: En kafka siempre vamos a enviar un mensaje a un topic y vamos a estar subscrito a un topic para la recepción.
  • Grupos: Los subscritores siempre van a estar formando grupos, y kafka se encargará que un mensaje solo sea enviado a un único subscritor de un grupo.

Veremos en los siguientes puntos la configuración de estos conceptos.

Configuración de Kafka con Spring Boot

Dependencias de Kafka

Sin duda lo más fácil para empezar integrar kafka a una aplicación Spring Boot, es empezar con el proyecto initializr de Spring e indicar las dependencias a usar.

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

Dependencia principal para el proyecto

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

Dependencia para test.

Arrancar un broker de kafka con Docker

Para continuar vamos a arrancar un kafka broker a través de docker:

$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest

Esta imagen de docker tiene una parte de administración con lo que le podremos, por ejemplo, desde postman inyectar mensajes.

Configurar Kafka en nuestra aplicación

En este tutorial vamos a ver como configurar un productor y un consumidor:

Configuración de Kafka en Spring Boot sin SSL

Configuración Productor

Gracias a la magia de la autoconfiguración de Spring Boot, con añadir la dependencia de Maven y unas propiedades en el fichero .yaml o properties, tendremos nuestro Productor listo para funcionar:

kafka.topic.name= myTopic
spring.kafka.bootstrap-servers=kafkaserver:9092
spring.kafka.consumer.group-id=myGroup

Hemos definido el topic del mensaje, la ruta de conexión con el broker, si hay varios se separan con comas; y el grupo al que se vinculan los listeners.

Lo que hemos hecho al añadir la dependencia de Kafka ha sido crear un ProducerFactory, en el que se establece la estrategia de envío de mensajes.

Hay que tener en cuenta que kafka tiene muchas más propiedades, estas son las básicas y por defecto.

Enviar un mensaje desde el Productor

Una vez que hemos realizado la configuración básica, estamos listos para enviar nuestro primer mensaje.

@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

  private final KafkaTemplate<String, String> kafkaTemplate;

  @Value(value = "${kafka.topic.name}")
  private String topic;

  public void sendMessage(String message) {

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message);
    future.addCallback(new ListenableFutureCallback<>() {
      @Override
      public void onSuccess(SendResult<String, String> result) {
        log.debug("Message {} has been sent ", message);
      }
      @Override
      public void onFailure(Throwable ex) {
        log.error("Something went wrong with the message {} ", message);
      }
    });
  }
}

Lo más importante de esta clase, es la clase KafkaTemplate, la cual es un recubrimiento del Producer y nos da métodos para poder trabajar con el. KafkaTemplate es thread-safe, y su uso es el recomendado para el envío de mensajes.

Como podemos apreciar el API de envío de mensajes nos devuelve un objeto ListenableFuture, con el que podemos bloquear el hilo de envío y obtener el resultado sobre el mensaje envío, pero esto hará algo más lento al productor.

La idea de kafka es procesar lo más rápido posible streams, por lo que es mejor que ese envío de mensajes sea de manera asíncrona a través de un callback, tal y como hemos hecho arriba. Este callback nos devolverá un onFailure si algo falló y un onSuccess si todo fue correcto.

Consumiendo mensajes

Para consumir mensajes necesitamo la Clase ConsumerFactory y  KafkaListenerContainerFactory, ambas se han configurado al añadir la dependencia de Spring, así que podremos hacer uso de la anotación @KafkaListener para recibir los mensajes, en el topic específicado.

@Component
@Slf4j
public class KafkaConsumer {


  @KafkaListener(topics = "${kafka.topic.name}")
  public void listener(String message) {
    log.debug("Message received {} ", message);

    //Do something
  }


}

En esta clase como hemos comentado anteriormente hemos usado la anotación @KafkaListener con un topic para la recepción de mensaje de ese topic. Como no hemos definido ningúnn group-id tomará por defecto el configurado en  spring.kafka.consumer.group-id.

Una vez hemos creado el consumidor solo nos queda poder probar nuestra aplicación.

Configuración de Kafka en Spring Boot con SSL

En el apartado anterior hemos visto como realizar la configuración sin SSL a nuestro broker de kafka, pero en el caso en el que queramos establecer mecanismos de seguridad podemos añadir un certificado y configuración por SSL. A continuación nos vamos a conectar a un broker con puerto 9094 que es seguro y le añadimos un trustore para la conexión. Para ver una configuración de kafka y despliegue de Kafka en Kubernetes, puedes echar un ojo a este artículo.

kafka.topic.name: test-strimzi-topic


spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: 127.0.0.1:9094
    ssl:
      trust-store-location: classpath:/kafka.jks
      trust-store-password: changeit
    consumer:
      group-id: demo-group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Creando un controlador para probar la aplicación.

Queremos probar la aplicación que hemos creado, por lo que vamos a añadir un controlador, para que de manera REST se encargue de enviar mensajes.

@RestController
@RequiredArgsConstructor
public class KafkaController {

  private final KafkaProducer kafkaProducer;


  @PostMapping("/messages/send")
  public ResponseEntity<String> sendMessage(@RequestBody String message) {

    kafkaProducer.sendMessage(message);

    return ResponseEntity.ok(message);
  }

}

Arrancamos la aplicación, ejecutamos desde postman un post a http:/localhost:8080/messages/send , con un body «hola» y podemos ver por consola la siguientes salidas.

Resultado

Message hola has been sent 
Message received hola

Conclusión

En esta entradade Kafka con Spring Boot Parte Uno hemos visto la configuración básica de producción y consumo de mensajes con Kafka, en los siguientes artículos iremos viendo diferentes y configuraciones más completas de kafka.

Puedes encontrar el ejemplo en nuestro github.

Otros artículos que te pueden interesar:

WebSocket con Stomp y Spring Boot

Coreografía VS Orquestación en Microservicios

Kafka en una Arquitectura de Microservicios


3 pensamientos sobre “Kafka con Spring Boot Parte Uno

  1. Hola, buen articulo que lo he descargado para probar el @Kafkalistener, pero no me pinta el log, he creado una clase, he jugado con el logger, pero no logro imprimir la traza que esta dentro del método @KafkaListener. Tu has podido lanzar las trazas por consola?

    1. Hola,

      Gracias, te has levantado kafka correctamente? Sino pinta el log ouede ser porque no te llegue el mensaje. Mira la consola de docker para ver si tienes algún error

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *