Kafka con Spring Boot Parte Uno
En los últimos tiempos Spring Boot se ha convertido en el framework de facto para la creación de micro servicios. Uno de los patrones utilizados a la hora de crear microservicios es la coreografía en donde Kafka mediante el patrón event sourcing nos puede ser de gran ayuda. En esta entrada vamos a tratar como configurar kafka con Spring Boot para poder realizar el envío de mensajes.
¿Qué es Kafka?
Kafka es una plataforma de código abierto de procesamiento de streams, desarrollada por Apache. Esta plataforma escrita en Java y Scala, tiene como objetivo principal, proporcionar baja latencia para poder procesar datos en tiempo real. El protocolo que utiliza es TCP, en la que agrupa los mensajes para reducir la sobrecarga de ida y vuelta a la red, lo que convierte a Kafka a día de hoy en un de las plataformas más utilizadas para el envío y recepción de mensajes.
Conceptos básicos
No vamos a profundizar mucho más en kafka, tan solo comentaremos dos conceptos básicos:
- Topics: En kafka siempre vamos a enviar un mensaje a un topic y vamos a estar subscrito a un topic para la recepción.
- Grupos: Los subscritores siempre van a estar formando grupos, y kafka se encargará que un mensaje solo sea enviado a un único subscritor de un grupo.
Veremos en los siguientes puntos la configuración de estos conceptos.
Configuración de Kafka con Spring Boot
Dependencias de Kafka
Sin duda lo más fácil para empezar integrar kafka a una aplicación Spring Boot, es empezar con el proyecto initializr de Spring e indicar las dependencias a usar.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Dependencia principal para el proyecto
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
Dependencia para test.
Arrancar un broker de kafka con Docker
Para continuar vamos a arrancar un kafka broker a través de docker:
$ docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest
Esta imagen de docker tiene una parte de administración con lo que le podremos, por ejemplo, desde postman inyectar mensajes.
Configurar Kafka en nuestra aplicación
En este tutorial vamos a ver como configurar un productor y un consumidor:
Configuración de Kafka en Spring Boot sin SSL
Configuración Productor
Gracias a la magia de la autoconfiguración de Spring Boot, con añadir la dependencia de Maven y unas propiedades en el fichero .yaml o properties, tendremos nuestro Productor listo para funcionar:
kafka.topic.name= myTopic spring.kafka.bootstrap-servers=kafkaserver:9092 spring.kafka.consumer.group-id=myGroup
Hemos definido el topic del mensaje, la ruta de conexión con el broker, si hay varios se separan con comas; y el grupo al que se vinculan los listeners.
Lo que hemos hecho al añadir la dependencia de Kafka ha sido crear un ProducerFactory, en el que se establece la estrategia de envío de mensajes.
Hay que tener en cuenta que kafka tiene muchas más propiedades, estas son las básicas y por defecto.
Enviar un mensaje desde el Productor
Una vez que hemos realizado la configuración básica, estamos listos para enviar nuestro primer mensaje.
@Component @RequiredArgsConstructor @Slf4j public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; @Value(value = "${kafka.topic.name}") private String topic; public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(this.topic, message); future.addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(SendResult<String, String> result) { log.debug("Message {} has been sent ", message); } @Override public void onFailure(Throwable ex) { log.error("Something went wrong with the message {} ", message); } }); } }
Lo más importante de esta clase, es la clase KafkaTemplate, la cual es un recubrimiento del Producer y nos da métodos para poder trabajar con el. KafkaTemplate es thread-safe, y su uso es el recomendado para el envío de mensajes.
Como podemos apreciar el API de envío de mensajes nos devuelve un objeto ListenableFuture, con el que podemos bloquear el hilo de envío y obtener el resultado sobre el mensaje envío, pero esto hará algo más lento al productor.
La idea de kafka es procesar lo más rápido posible streams, por lo que es mejor que ese envío de mensajes sea de manera asíncrona a través de un callback, tal y como hemos hecho arriba. Este callback nos devolverá un onFailure si algo falló y un onSuccess si todo fue correcto.
Consumiendo mensajes
Para consumir mensajes necesitamo la Clase ConsumerFactory y KafkaListenerContainerFactory, ambas se han configurado al añadir la dependencia de Spring, así que podremos hacer uso de la anotación @KafkaListener para recibir los mensajes, en el topic específicado.
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "${kafka.topic.name}") public void listener(String message) { log.debug("Message received {} ", message); //Do something } }
En esta clase como hemos comentado anteriormente hemos usado la anotación @KafkaListener con un topic para la recepción de mensaje de ese topic. Como no hemos definido ningúnn group-id tomará por defecto el configurado en spring.kafka.consumer.group-id.
Una vez hemos creado el consumidor solo nos queda poder probar nuestra aplicación.
Configuración de Kafka en Spring Boot con SSL
En el apartado anterior hemos visto como realizar la configuración sin SSL a nuestro broker de kafka, pero en el caso en el que queramos establecer mecanismos de seguridad podemos añadir un certificado y configuración por SSL. A continuación nos vamos a conectar a un broker con puerto 9094 que es seguro y le añadimos un trustore para la conexión. Para ver una configuración de kafka y despliegue de Kafka en Kubernetes, puedes echar un ojo a este artículo.
kafka.topic.name: test-strimzi-topic spring: kafka: security: protocol: "SSL" bootstrap-servers: 127.0.0.1:9094 ssl: trust-store-location: classpath:/kafka.jks trust-store-password: changeit consumer: group-id: demo-group-id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Creando un controlador para probar la aplicación.
Queremos probar la aplicación que hemos creado, por lo que vamos a añadir un controlador, para que de manera REST se encargue de enviar mensajes.
@RestController @RequiredArgsConstructor public class KafkaController { private final KafkaProducer kafkaProducer; @PostMapping("/messages/send") public ResponseEntity<String> sendMessage(@RequestBody String message) { kafkaProducer.sendMessage(message); return ResponseEntity.ok(message); } }
Arrancamos la aplicación, ejecutamos desde postman un post a http:/localhost:8080/messages/send , con un body «hola» y podemos ver por consola la siguientes salidas.
Resultado
Message hola has been sent
Message received hola
Conclusión
En esta entradade Kafka con Spring Boot Parte Uno hemos visto la configuración básica de producción y consumo de mensajes con Kafka, en los siguientes artículos iremos viendo diferentes y configuraciones más completas de kafka.
Puedes encontrar el ejemplo en nuestro github.
Otros artículos que te pueden interesar:
WebSocket con Stomp y Spring Boot
Coreografía VS Orquestación en Microservicios
Kafka en una Arquitectura de Microservicios
Hola, buen articulo que lo he descargado para probar el @Kafkalistener, pero no me pinta el log, he creado una clase, he jugado con el logger, pero no logro imprimir la traza que esta dentro del método @KafkaListener. Tu has podido lanzar las trazas por consola?
Hola,
Gracias, te has levantado kafka correctamente? Sino pinta el log ouede ser porque no te llegue el mensaje. Mira la consola de docker para ver si tienes algún error