Patrón Saga con Quarkus y Kafka
En esta nueva entrada vamos a ver un ejemplo práctico sobre el Patrón Saga con Quarkus y Kafka.
Como ya hemos hablado alguna vez de kafka, esté nos ofrece una plataforma distribuida que ofrece una serie de características que nos permite trabajar con mensajes entre servicios de manera asíncrona. Kafka es altamente escalable y tolerante a fallos gracias a su sistema de replicación y de líder.
En este artículo vamos a construir una arquitectura con Saga en la que la comunicación entre los diferentes servicios se establezca a través de mensajes de forma asíncrona con Kafka. Para poder realizar este artículo vamos a crear tres servicios diferentes, estos representarán un modelo simple de seleccionar un asiento en un cine.
¿Qué es el Patrón Saga?
Como ya vimos en un artículo anterior de refactorizando, podríamos definir el patrón Saga como una secuencia de transacciones entre microservicios. Que a diferencia del patrón 2PC (Two Phases Commit), no vamos a realizar la transacción en una única petición, ni cada participante va a poder revertir la transacción de manera independiente; sino que va a tener que notificar a cada participante que la transacción tiene que ser revertida.
Una transacción sería la unidad de trabajo de cada participante de nuestra arquitectura de Saga. Cada operación que realicen los servicios deben de permitir una compensación, es decir, un rollback. Además siempre debemos asegurar que nuestras transacciones han finalizado correctamente, en caso contrario realizaremos la compensación.
Hands On con Quarkus y Apache Kafka
A continuación vamos a construir un ejemplo de Patrón Saga con Quarkus y Kafka, el cual consistirá en un usuario que quiere seleccionar un asiento para ver una película, este asiento se bloqueará y enviará un evento para realizar el pago y finalmente se realizará la actualización de los asientos. Si en algún momento del proceso hay algún error o problema, se realizará un sistema de compensación para volver atrás y dejar el sistema en el mismo estado anterior. Los microservicios serán Order, Payment y Allocate.
Si quieres ver el ejemplo directamente puedes consultarlo en nuestro github.
Un patrón saga se puede realizar a través de un orquestador o de una coreografía, en nuestro ejemplo hemos optado por una coreografía.
El flujo de nuestro sistema se muestra a continuación
En el siguiente dibujo vamos a ver a través de que topics se van a notificar los mensajes de los productores y consumidores en nuestra solución:
Cada servicio constará de una base de datos en memoria H2 que gestionaremos haciendo uso de Hibernate Panache.
A continuación vamos a definir la configuración necesaria para añadir Kafka y nuestra base de datos H2, para continuar definiendo cada servicio.
Configuración de nuestros servicios en Quarkus
A continuación vamos a añadir la configuración básica de los tres servicios que vamos a crear, para ello vamos a añadir las dependencias necesarias en el pom.xml de cada servicio:
Depedencias Maven en Quarkus con Kafka
Para comenzar con la creación de nuestro ejemplo vamos a empezar agregando a nuestros pom, quarkus-smallrye-reactive-messaging-kafka
y quarkus-resteasy-jackson
.
O podemos ir a la página de Quarkus y generar el proyecto en el siguiente enlace, https://code.quarkus.io/, o finalmente tambień podemos crear nuestro proyecto haciendo uso de maven para la generación del arquetipo con las dependencias necesarias:
mvn io.quarkus:quarkus-maven-plugin:2.0.1.Final:create \ -DprojectGroupId=com.refactorizando.sample.saga \ -DprojectArtifactId=order-service \ -DclassName="com.refactorizando.sample.saga.OrderResource" \ -Dpath="/orders" \ -Dextensions="resteasy,smallrye-reactive-messaging-kafka,jdbc-h2"
Este proceso vamos a hacerlo tres veces para crear nuestros tres servicios (Order, payment y Allocate).
Configuración de Quarkus con Kafka
Vamos a añadir la autoconfiguración de Quarkus para que pueda conectarse a kafka y a los topics. Para ello vamos a levantar una imágen de kafka con Docker:
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest
En nuestro fichero de propiedades de nuestra carpeta resources (application.properties), vamos a añadir nuestra configuración de kafka. Para definir la configuración vamos a diferenciar entre consumidor y productor através de outgoing e incoming.
Al tratarse de una arquitectura con Saga, nuestros servicios realizarán compensación a través de mensajes por lo que serán tanto productores como consumidores.
Por ejemplo la configuración para nuestro servicio order será:
# The Kafka broker location (defaults to localhost:9092) kafka.bootstrap.servers=localhost:9092 # Configuring the incoming channel (reading from Kafka) mp.messaging.incoming.seats-in.connector=smallrye-kafka mp.messaging.incoming.seats-in.topic=status-update mp.messaging.incoming.seats-in.key.deserializer=org.apache.kafka.common.serialization.LongDeserializer mp.messaging.incoming.seats-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # Configuring the outgoing channel (writing to Kafka) mp.messaging.outgoing.seats-out.connector=smallrye-kafka mp.messaging.outgoing.seats-out.topic=seats mp.messaging.outgoing.seats-out.key.serializer=org.apache.kafka.common.serialization.LongSerializer mp.messaging.outgoing.seats-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
En la configuración vemos como tenemos un productor, el cual enviará un mensaje al servicio payment, y un consumidor el cual recibirá tanto la compensación como la actualización del asiento seleccionado. La configuración anterior pertenece al servicio Order-Service.
Además podemos ver la serialización y deserialización a usar y la dirección de kafka que hemos levantado haciendo uso de nuestra imágen docker.
Configuración de Quarkus con H2
En nuestro ejemplo vamos a hacer uso de una base de datos en memoria con Hibernate Panache, para ello vamos a realizar la siguiente configuración en nuestro fichero de propiedades (application.properties).
#Configuring database quarkus.datasource.db-kind=h2 quarkus.datasource.username=sa quarkus.datasource.password=password quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb quarkus.hibernate-orm.database.generation=drop-and-create quarkus.hibernate-orm.log.sql=true
Además vamos a cargar un Script sql para llenar nuestra base de datos al arrancar la aplicación de manera automática, para ello vamos a añadir la siguiente línea:
quarkus.hibernate-orm.sql-load-script=load-data.sql
Y para poder tener datos en nuestra base de datos vamos a escribir los siguientes inserts en cada servicio:
INSERT INTO USER(id, name, surname, email) VALUES(1, 'noel', 'rodriguez', 'refactorizando.web@gmail.com'); INSERT INTO SEAT(id, column, row, status, user_id) VALUES(1, 1, 1, 'FREE',1);
Creación de Order Service
Una vez que hemos añadido y creado nuestros proyectos con la configuración básica, y hemos entendido como definir la configuración a kafka y H2 vamos a comenzar a crear nuestro patrón Saga.
Para empezar vamos a crear nuestro primer servicio Order Service, el cual además será el punto de entrada de un usuario. Para ello vamos a definir el código necesario para su implementación en el que por sencillez vamos a saltarnos algún paso (como definición del modelo o métodos e implementación del repositorio), si lo necesitas, puedes ver el código completo y la implementación completa pulsando aquí.
Este servicio se encargará de reservar un asiento por un cliente, y el asiento cambiará su estado a bloqueado. Una vez se produzca el cambio de estado (Event Driven) se enviará un evento a Payment. En el caso en el que Payment no pueda realizar el cobro o exista algún error se comepensará y el asiento volverá a estar libre. Vamos a ver los pasos más en detalle, divididos en la transacción y en la compensación:
Creación de la transacción
Creamos nuestro endpoint de entrada, el cual será un post con un asiento y un usuario:
{ "id":1, "column": 1, "row":1, "user":{ "id":1, "name":"noel" } }
@POST public Response orderSeat(Seat seat) { log.info("New order received "); return Response.status(Response.Status.CREATED) .entity(service.reservedSeat(seat)).build(); }
Esta petición actualizará en base de datos el estado del asiento a estado bloqueado, y en el momento en el que cambia el estado, se emitirá un evento.
public Seat reservedSeat(Seat seat) { log.info("Update seat ", seat.getId()); var seatToSave = seatService.lockSeat(seat.getId()); seatEventProducer.sendOrder(seatToSave); return seatToSave; }
En el fragmento de código anterior, hemos actualizado el estado a bloqueado del asiento y hemos envíado un evento. A continuación vamos a ver como crear un evento para enviar un mensaje en Quarkus a Kafka, el cual se encargará de notificarlo a Payment:
@ApplicationScoped @Slf4j public class SeatEventProducer { @Inject @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendOrder(Seat seat) { log.info("Event sent {}", seat.getId()); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
La clase anterior es la encargada de enviar un mensaje. En este caso vamos a enviar un objeto seat.
Para poder enviar un mensaje a kafka en Quarkus tenemos que inyectar la dependencia de Emitter, que enviará mensajes a ese canal, en este caso seat-out. El cual ha sido configurado en nuestro application.properties.
Vamos a enviar objetos record, compuestos de la clave del objeto seat y del body.
Creación de la compensación
Tal y como hemos comentado en este tipo de arquitecturas, tenemos que asegurarnos que la transacción ha sido realizada en caso contrario deberemos de tener un sistema de compensación.
Para nuestro ejemplo vamos a realizar una compensación cuando un pago no se pueda realizar o cuando una actualización de asiento no pueda ser realizada. Para esos dos casos vamos a crear un consumidor que se encargará de realizar estos pasos:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatEventConsumer { private final SeatService seatService; @SneakyThrows @Incoming("seats-in") public void receive(Record<Long, String> record) { log.info("record key is: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var seat = objectMapper.readValue(record.value(), Seat.class); if (null != seat.getType() && seat.getType().equalsIgnoreCase("compensation")) { seatService.unlockSeat(seat.getId()); } else { seatService.busySeat(seat.getId()); } } }
En el fragmento anterior vamos a recibir mensajes mediante la anotación @Incoming, con la que le decimos a Quarkus mediante configuración, el topic, el conector, el deserializador etc…. Además podemos ver como, vamos a recibir mensajes para compensar y liberar el asiento bloqueado o finalizar la transacción y ocuparlo. Este último paso solo será posible si la transacción se ha realizado con éxito por todos los microservicios que ha pasado.
Creación de Payment Service
Dentro de nuestra arquitectura vamos a tener un microservicio que se va a encargar de gestionar y realizar los pagos del usuario que ha reservado el asiento.
Este microservicio va a recibir una notificación a través de un evento a un topic al que estará subscrito, una vez que lo recibá comenzará los pasos para realizar el pago, si hay algún problema durante este paso se compensará al servicio order service mediante un mensaje.
Objecto Payment
Este va a ser el objeto Payment que vamos a guardar en nuestra base de datos. Como curiosidad indicar que vamos a hacer soft delete de los pagos que den error, es decir, en la compensación no vamos a borrar el pago, sino que cambiaremos su estado para tener un histórico de los pagos que han fallado.
Para realizar el soft delete haremos uso de la anotación @SQLDelete que nos proporciona Hibernate.
@Getter @Setter @Entity @SQLDelete(sql = "UPDATE Payment SET status=deleted WHERE id = ?") @ToString public class Payment { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String status; @OneToOne private User user; @OneToOne private Seat seat; @JsonDeserialize(using = LocalDateDeserializer.class) @JsonSerialize(using = LocalDateSerializer.class) private LocalDate date; private BigDecimal amount; }
Creación del consumidor de kafka con Quarkus
El primer paso que vamos a realizar es crear un consumidor que se conecte al topic de kafka donde los mensajes son enviados para poder crear un pago:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentConsumer { private final MakePaymentUseCase makePaymentUseCase; @SneakyThrows @Incoming("payments-in") public void receive(Record<Long, String> record) { log.info("record key: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var seat = objectMapper.readValue(record.value(), Seat.class); makePaymentUseCase.makeAPayment(seat); } }
La creación del consumidor es igual que en el microservicio order-service, en el mediante la anotación @Incoming le indicamos la configuración a Quarkus en el application.properties para que se conecte a Kafka.
En el fragmento de código recibimos un objeto seat y a partir de ahí, creamos un pago con el realizaremos otra transacción hasta el microservicio de allocate-service.
Creación del pago y de la transacción
Una vez recibido el asiento bloqueado por un usuario, vamos a realizar el pago con lo que cambiará el estado y envíaremos una notificación al servicio allocate-service para cambiar el asiento a ocupado. O en el caso en el que falle la creación del pago, realizaremos una compensación a order-service para liberar el asiento.
public Payment makeAPayment(Seat seat) { log.info("Create payment with seat {}", seat.getId()); var payment = createPayment(seat); try { paymentService.savePayment(payment); }catch (Exception ex) { seatEventProducer.sendSeatEvent(payment.getSeat()); return payment; } paymentProducer.sendPaymentEvent(payment); return payment; }
@ApplicationScoped @Slf4j @RequiredArgsConstructor public class PaymentProducer { @Channel("payments-out") Emitter<Record<Long, String>> emitter; private final SeatEventProducer seatEventProducer; private final DeletePaymentUseCase deletePaymentUseCase; @SneakyThrows public void sendPaymentEvent(Payment payment) { log.info("Event sent {}", payment.getId()); ObjectMapper objectMapper = new ObjectMapper(); var paymentJson = objectMapper.writeValueAsString(payment); emitter.send(Record.of(payment.getId(), paymentJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); seatEventProducer.sendSeatEvent(payment.getSeat()); deletePaymentUseCase.deletePayment(payment.getId()); } else { log.info("Message processed successfully"); } }); } }
Hay que tener en cuenta que en el código anterior envíamos un pago realizado para realizar una actualización del asiento, pero en el caso en el que el mensaje no se pueda entregar y no sea confirmada su entregar haremos un rollback, es decir, envíaremos una notificación de liberar el asiento a order-service, y borraremos el pago realizado. Es por esa razón que utilizamos .whenComplete, con lo que sabremos si ha habido algún error en la entrega.
Para poder conectar en Quarkus con el topic correspondiente y la configuración de kafka
Creación de las compensaciones
En el caso de payment-service vamos a tener compensaciones para consumir y para producir. Es decir, por un lado vamos a notificar a order-service cualquier problema para liberar el asiento, y por otro lado vamos a consumir los mensajes que nos lleguen desde allocate para realizar la compensación en payment y propagarlo hasta order-service.
Vamos a ver en primer lugar la compensación que se envia a order-service:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatEventProducer { @Inject @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendSeatEvent(Seat seat) { log.info("Sent event to order service to compensate with seatId {}", seat.getId()); seat.setType("compensation"); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
Y a continuación vamos a crear el consumidor de la compensación que será enviado a payment para que efectue un rollback del pago en el caso en el que una transacción falle. Por un lado habría que realizar la eliminación del pago, y por otro lado enviar una notificación a order service para que se encargué de volver a establecer a libre el asiento reservado:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentConsumerCompensation { private final DeletePaymentUseCase deletePaymentUseCase; private final SeatEventProducer seatEventProducer; @SneakyThrows @Incoming("allocate-in") public void receive(Record<Long, String> record) { log.info("Payment compensation with key {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var payment = objectMapper.readValue(record.value(), Payment.class); deletePaymentUseCase.deletePayment(record.key()); seatEventProducer.sendSeatEvent(payment.getSeat()); } }
Creación de Allocate Service
Este servicio será el encargado de recibir las notificaciones desde payment una vez el pago ha sido realizado con éxito. Este microservicio será el responsable de finalizar la transacción actualizando el estado del asiento a ocupado y envíando una notificación a order-service para que también actulicé el estado, y además será el microservicio de consulta de los estados de los asientos. Al igual que ocurría con Payments, en el caso en el que al actualizar el estado de algún error una compensación será propagada para deshacer el pago y actualizar el asiento a libre.
En este microservicio vamos a diferenciar entre consumidores de eventos y productores:
Consumidor de kafka en Allocate Service
Nuestro microservicio Allocate-service tendrá un único consumidor, el cual se va a encargar de recibir los eventos que llegan desde Payment informando que el pago ha sido realizado.
Una vez el evento llega de manera correcta el servicio actualizará el estado del asiento (seat), haciendo una petición a updateSeat:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class AllocateConsumer { private final AllocateUseCase allocateUseCase; @SneakyThrows @Incoming("allocates-in") public void receive(Record<Long, String> record) { log.info("Event received with key: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var payment = objectMapper.readValue(record.value(), Payment.class); var seat = payment.getSeat(); allocateUseCase.updateSeat(seat); } }
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class AllocateUseCase { private final SeatService seatService; private final PaymentProducerCompensation paymentProducerCompensation; private final AllocateProducer allocateProducer; public Seat updateSeat(Seat seat) { log.info("Save seat {}", seat.toString()); try { seatService.updateSeat(seat); allocateProducer.sendSeatEvent(seat); } catch (Exception ex) { paymentProducerCompensation.sendPaymentEvent(seat.getPayment()); } return seatService.findById(seat.getId()); }
En el código anterior podemos ver como realizamos la actualización del evento y en el caso en el que no exista ningún error envíamos un mensaje para notificar el cambio, si hay algún error se notifica a Payment para que deshaga los cambios.
Finalmente en la parte de consumo del mensaje vamos a ver el código para realizar la actualización. Para la gestión con la base de datos hemos utilizado PanacheRepository, y haciendo uso de esta librería hacemos una actualización del asiento a OCCUPIED.
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatService { private final SeatRepository seatRepository; @Transactional(REQUIRES_NEW) public void updateSeat(Seat seat) { log.info("Block a seat {}", seat.toString()); seatRepository.update("status = 'OCCUPIED' where id = ?1", seat.getId()); } public Seat findById(Long id) { return seatRepository.findById(id); } }
Productor de Kafka con Quarkus en Allocate Service
Este microservicio tendrá dos productores que se encargará uno de enviar una notificación a Order Service para actualizar el estado del asiento en ese servicio, y otro para realizar una compensación del pago en el caso en el que algún error exista.
El siguiente fragmento de código va a enviar un mensaje a order service para realizar una actualización del status de seat en ese servicio. Como hemos visto anteriormente haremos uso de la anotación de Quarkus @Channel con la que proporcionaremos la configuración de kafka que se encuentra en el application.properties de nuestra aplicación.
@ApplicationScoped @Slf4j public class AllocateProducer { @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendSeatEvent(Seat seat) { log.info("Event sent seat with id{}", seat.getId()); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
En el caso en el que la actualización del estado del asiento tenga algún error, será creado un evento de compensación a Payment, tal y como hemos visto en el fragmento de código de actualización del estado (AllocateUseCase).
Para enviar este evento haremos uso del channel payments-out, para enviar un mensaje al topic al que también payment se encuentra subscrito:
package com.refactorizando.sample.saga.events.compensation; import com.fasterxml.jackson.databind.ObjectMapper; import com.refactorizando.sample.saga.model.Payment; import com.refactorizando.sample.saga.model.Seat; import com.refactorizando.sample.saga.service.PaymentService; import io.smallrye.reactive.messaging.kafka.Record; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentProducerCompensation { @Channel("payments-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendPaymentEvent(Payment payment) { log.info("Event sent {}", payment.getId()); ObjectMapper objectMapper = new ObjectMapper(); var paymentJson = objectMapper.writeValueAsString(payment); emitter.send(Record.of(payment.getId(), paymentJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
Notas e información sobre Quarkus y Kafka.
- Hemos hecho uso de la anotación @QuarkusMain para arrancar nuestro proyecto desde nuestro IDE, aunque con Quarkus no es necesario, ya que desde maven te genera el proyecto y se puede lanzar.
- Los proyectos tienen las mismas librerías por lo que se podrán crear de la misma manera.
- Los tres servicios hacen uso del consumidor de kafka a través de la anotación @Incoming, que nos permitirá definir la configuración en el application.properties.
- Todos los servicios hacen uso del productor de Kafka a través de la anotación @Channel que nos permitirá crear la configuración necesaria en el application.properties. Y de esta manera poder emitir un evento.
- En todos los proyectos se hace uso de Lombok para evitar el boilerplate de Java.
- La Inyección de dependencias se hace a través del constructor haciendo uso de Lombok.
- Hermos levantado Kafka haciendo uso de Docker.
- Si quieres probar en local asegúrate de tener cada servicio en un puerto diferente.
Conclusión
En esta entrada sobre Patrón Saga con Quarkus y Kafka, hemos visto como implementar y desarrollar desde 0 y paso a paso una arquitectura basada en transacciones.
Este patrón de diseño se hace imprescindible en arquitecturas orientadas a microservicios cuando es necesario hacer uso de transacciones y no podemos hacer uso de 2PC (Two Phases Commit). Con lo que nos aseguraremos que nuestras transacciones son procesadas correctamente.
Si necesitas más información puedes escribirnos un comentario o un correo electrónico a refactorizando.web@gmail.com y te ayudaremos encantados!