In this new post, we will see a practical example of the Saga Pattern with Quarkus and Kafka.
Kafka, a distributed platform, enables asynchronous messaging between services with a set of powerful features. Kafka is highly scalable and fault-tolerant thanks to its replication and leader system.
In this article, we will build an architecture with Saga in which communication between different services is established through messages asynchronously with Kafka. This article creates three services representing seat selection in a cinema to illustrate the concepts discussed.
What is the Saga Pattern?
As we saw in a previous article of Refactorizando, we could define the Saga Pattern as a sequence of transactions between microservices. Unlike the 2PC (Two Phases Commit) pattern, we will not perform the transaction in a single request, nor will each participant be able to revert the transaction independently; instead, they will have to notify each participant that the transaction needs to be reversed.
A transaction would be the unit of work for each participant in our Saga architecture. Each operation performed by the services must allow for compensation, i.e., a rollback. Additionally, we must always ensure that our transactions have completed successfully; otherwise, we will perform compensation.
Hands On with Quarkus and Apache Kafka
We will now build an example of Saga Pattern with Quarkus and Kafka, which will consist of a user who wants to select a seat to watch a movie, the seat will be blocked and an event will be sent to make the payment, and finally, the seat allocation will be updated. If there is any error or problem at any point in the process, a compensation system will be performed to roll back and leave the system in the same previous state. The microservices will be Order, Payment, and Allocate.
If you want to see the example directly, you can check it out on our GitHub.
A saga pattern can be performed through an orchestrator or a choreography. In our example, we have opted for a choreography.
The flow of our system is shown below.
In the following diagram, we will see through which topics the messages from producers and consumers will be notified in our solution.
Each service will consist of an H2 in-memory database that we will manage using Hibernate Panache.
Next, we will define the necessary configuration to add Kafka and our H2 database in order to continue defining each service.
Configuration of our services in Quarkus
Next, we will add the basic configuration for the three services we are going to create. To do this, we will add the necessary dependencies in the pom.xml file of each service:
Maven Dependencies in Quarkus with Kafka
To start with the creation of our example, we will begin by adding quarkus-smallrye-reactive-messaging-kafka and quarkus-resteasy-jackson to our pom.
Alternatively, we can go to the Quarkus website and generate the project using the following link, https://code.quarkus.io/. Finally, we can also create our project using Maven to generate the archetype with the necessary dependencies.
mvn io.quarkus:quarkus-maven-plugin:2.0.1.Final:create \ -DprojectGroupId=com.refactorizando.sample.saga \ -DprojectArtifactId=order-service \ -DclassName="com.refactorizando.sample.saga.OrderResource" \ -Dpath="/orders" \ -Dextensions="resteasy,smallrye-reactive-messaging-kafka,jdbc-h2"
We will perform this process three times to create our three services (Order, Payment, and Allocate).
Configuring Quarkus with Kafka
We will add Quarkus auto-configuration to connect to Kafka and the topics. To do this, we will start a Kafka image with Docker:
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=localhost landoop/fast-data-dev:latest
In our properties file in the resources folder (application.properties), we will add our Kafka configuration. To define the configuration, we will differentiate between consumer and producer through outgoing and incoming.
As it is a Saga architecture, our services will perform compensation through messages, so they will be both producers and consumers.
For example, the configuration for our Order service will be:
# The Kafka broker location (defaults to localhost:9092) kafka.bootstrap.servers=localhost:9092 # Configuring the incoming channel (reading from Kafka) mp.messaging.incoming.seats-in.connector=smallrye-kafka mp.messaging.incoming.seats-in.topic=status-update mp.messaging.incoming.seats-in.key.deserializer=org.apache.kafka.common.serialization.LongDeserializer mp.messaging.incoming.seats-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # Configuring the outgoing channel (writing to Kafka) mp.messaging.outgoing.seats-out.connector=smallrye-kafka mp.messaging.outgoing.seats-out.topic=seats mp.messaging.outgoing.seats-out.key.serializer=org.apache.kafka.common.serialization.LongSerializer mp.messaging.outgoing.seats-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
In the configuration, we can see that we have a producer, which will send a message to the payment service, and a consumer that will receive both the compensation and the update of the selected seat. The above configuration belongs to the Order-Service.
Additionally, we can see the serialization and deserialization to be used, as well as the Kafka address that we have set up using our Docker image.
Quarkus Configuration with H2
In our example, we are going to use an in-memory database with Hibernate Panache. For that, we will make the following configuration in our properties file (application.properties).
#Configuring database quarkus.datasource.db-kind=h2 quarkus.datasource.username=sa quarkus.datasource.password=password quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb quarkus.hibernate-orm.database.generation=drop-and-create quarkus.hibernate-orm.log.sql=true
Additionally, we are going to load an SQL script to populate our database automatically when the application starts. For that, we will add the following line:
quarkus.hibernate-orm.sql-load-script=load-data.sql
And in order to have data in our database, we will write the following inserts in each service:
INSERT INTO USER(id, name, surname, email) VALUES(1, 'noel', 'rodriguez', 'refactorizando.web@gmail.com'); INSERT INTO SEAT(id, column, row, status, user_id) VALUES(1, 1, 1, 'FREE',1);
Order Service creation
Once we have added and created our projects with the basic configuration, and have understood how to define the configuration for Kafka and H2, we will start creating our Saga pattern.
To begin, we will create our first service, the Order Service, which will also serve as the entry point for a user. For this, we will define the necessary code for its implementation, skipping some steps for simplicity (such as model definition or repository methods and implementation). If needed, you can see the complete code and full implementation by clicking here.
This service will be responsible for reserving a seat for a customer, and the seat’s status will change to “blocked”.Once the status change occurs (Event Driven), we will send an event to the Payment service. If the Payment service cannot process the payment or encounters an error, it will initiate compensation and release the seat. Let’s see the steps in more detail, divided into the transaction and the compensation:
Transaction creation
We create our entry endpoint, which will be a POST request with a seat and a user:
{ "id":1, "column": 1, "row":1, "user":{ "id":1, "name":"noel" } }
@POST public Response orderSeat(Seat seat) { log.info("New order received "); return Response.status(Response.Status.CREATED) .entity(service.reservedSeat(seat)).build(); }
This request will update the seat’s status to ‘blocked’ in the database, and when the status changes, an event will be triggered.
public Seat reservedSeat(Seat seat) { log.info("Update seat ", seat.getId()); var seatToSave = seatService.lockSeat(seat.getId()); seatEventProducer.sendOrder(seatToSave); return seatToSave; }
In the previous code snippet, we have updated the seat’s status to ‘blocked’ and sent an event. Now let’s see how to create an event to send a message in Quarkus to Kafka, which will be responsible for notifying it to Payment:
@ApplicationScoped @Slf4j public class SeatEventProducer { @Inject @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendOrder(Seat seat) { log.info("Event sent {}", seat.getId()); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
The previous class is responsible for sending a message. In this case, we are going to send a seat object.
To send a message to Kafka in Quarkus, we need to inject the Emitter dependency, which will send messages to the ‘seat-out’ channel. This channel has been configured in our application.properties.
We will send record objects, composed of the key of the seat object and the body.
Creating Compensation
As we mentioned before, in this type of architecture, we need to ensure that the transaction has been completed; otherwise, we need to have a compensation system.
In our example, we will perform compensation when a payment cannot be made or when a seat update cannot be performed. For these two cases, we will create a consumer that will be responsible for performing these steps:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatEventConsumer { private final SeatService seatService; @SneakyThrows @Incoming("seats-in") public void receive(Record<Long, String> record) { log.info("record key is: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var seat = objectMapper.readValue(record.value(), Seat.class); if (null != seat.getType() && seat.getType().equalsIgnoreCase("compensation")) { seatService.unlockSeat(seat.getId()); } else { seatService.busySeat(seat.getId()); } } }
In the previous snippet, we will receive messages using the @Incoming annotation, which tells Quarkus, through configuration, the topic, connector, deserializer, etc. Additionally, we can see how we will receive messages to compensate and release the blocked seat or finalize the transaction and occupy it. The latter step will only be possible if the transaction has been successfully completed by all the microservices it has passed through.
Creating the Payment Service
Within our architecture, we will have a microservice that will handle and process payments for the user who has reserved the seat.
This microservice will receive a notification through an event on a subscribed topic. Once received, it will begin the steps to process the payment. If there is any issue during this step, a message will be sent to the order service for compensation.
Payment Object
This is the Payment object that we will store in our database. As a note of interest, we will perform a soft delete on payments that encounter errors. This means that during compensation, we will not delete the payment but rather change its status to maintain a history of failed payments.
To perform the soft delete, we will utilize the @SQLDelete annotation provided by Hibernate.
@Getter @Setter @Entity @SQLDelete(sql = "UPDATE Payment SET status=deleted WHERE id = ?") @ToString public class Payment { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String status; @OneToOne private User user; @OneToOne private Seat seat; @JsonDeserialize(using = LocalDateDeserializer.class) @JsonSerialize(using = LocalDateSerializer.class) private LocalDate date; private BigDecimal amount; }
Creating Kafka Consumer with Quarkus
The first step we will take is to create a consumer that connects to the Kafka topic where messages are sent in order to create a payment:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentConsumer { private final MakePaymentUseCase makePaymentUseCase; @SneakyThrows @Incoming("payments-in") public void receive(Record<Long, String> record) { log.info("record key: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var seat = objectMapper.readValue(record.value(), Seat.class); makePaymentUseCase.makeAPayment(seat); } }
The consumer creation is the same as in the order-service microservice, where we use the @Incoming annotation to indicate the configuration to Quarkus in the application.properties file to connect to Kafka.
In the code snippet, we receive a seat object and create a payment that initiates another transaction to the allocate-service microservice.
Payment and transaction creation
Once we receive the seat locked by a user, we will proceed with the payment, which will change the status, and we will send a notification to the allocate-service to mark the seat as occupied. In case the payment creation fails, we will perform a compensation to the order-service to release the seat.
public Payment makeAPayment(Seat seat) { log.info("Create payment with seat {}", seat.getId()); var payment = createPayment(seat); try { paymentService.savePayment(payment); }catch (Exception ex) { seatEventProducer.sendSeatEvent(payment.getSeat()); return payment; } paymentProducer.sendPaymentEvent(payment); return payment; }
@ApplicationScoped @Slf4j @RequiredArgsConstructor public class PaymentProducer { @Channel("payments-out") Emitter<Record<Long, String>> emitter; private final SeatEventProducer seatEventProducer; private final DeletePaymentUseCase deletePaymentUseCase; @SneakyThrows public void sendPaymentEvent(Payment payment) { log.info("Event sent {}", payment.getId()); ObjectMapper objectMapper = new ObjectMapper(); var paymentJson = objectMapper.writeValueAsString(payment); emitter.send(Record.of(payment.getId(), paymentJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); seatEventProducer.sendSeatEvent(payment.getSeat()); deletePaymentUseCase.deletePayment(payment.getId()); } else { log.info("Message processed successfully"); } }); } }
It’s important to note that in the previous code, we send a payment to perform a seat update. However, if the message cannot be delivered and its delivery is not confirmed, we will perform a rollback. This means we will send a notification to order-service to release the seat and delete the performed payment. That’s why we use .whenComplete
to determine if there was any error in the delivery.
To connect in Quarkus with the corresponding topic and Kafka configuration,
Compensation creation
In the case of payment-service, we will have compensations for consumption and production. This means that on one hand, we will notify order-service about any issues to release the sea. And on the other hand, we will consume messages coming from allocate to perform the compensation in payment and propagate it up to order-service.
Let’s first take a look at the compensation sent to order-service:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatEventProducer { @Inject @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendSeatEvent(Seat seat) { log.info("Sent event to order service to compensate with seatId {}", seat.getId()); seat.setType("compensation"); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
Then, we will create the compensation consumer that sends a rollback request to the payment-service in case of a transaction failure. We delete the payment and notify the order-service to make the reserved seat available again.
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentConsumerCompensation { private final DeletePaymentUseCase deletePaymentUseCase; private final SeatEventProducer seatEventProducer; @SneakyThrows @Incoming("allocate-in") public void receive(Record<Long, String> record) { log.info("Payment compensation with key {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var payment = objectMapper.readValue(record.value(), Payment.class); deletePaymentUseCase.deletePayment(record.key()); seatEventProducer.sendSeatEvent(payment.getSeat()); } }
Allocate Service Creation
This service will be responsible for receiving notifications from payment-service once the payment has been successfully made. This microservice will handle finalizing the transaction by updating the seat status to occupied and sending a notification to order-service to update its status as well. Additionally, it will serve as the seat status query microservice. Similar to Payments, if there is an error updating the status, a compensation will be propagated to undo the payment and update the seat status to available.
In this microservice, we will differentiate between event consumers and producers:
Kafka Consumer in Allocate Service
The Allocate-service microservice will have a single consumer that receives events from Payment indicating successful payment.
Once the event is received successfully, the service will update the seat status by making a request to updateSeat:
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class AllocateConsumer { private final AllocateUseCase allocateUseCase; @SneakyThrows @Incoming("allocates-in") public void receive(Record<Long, String> record) { log.info("Event received with key: {}", record.key()); ObjectMapper objectMapper = new ObjectMapper(); var payment = objectMapper.readValue(record.value(), Payment.class); var seat = payment.getSeat(); allocateUseCase.updateSeat(seat); } }
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class AllocateUseCase { private final SeatService seatService; private final PaymentProducerCompensation paymentProducerCompensation; private final AllocateProducer allocateProducer; public Seat updateSeat(Seat seat) { log.info("Save seat {}", seat.toString()); try { seatService.updateSeat(seat); allocateProducer.sendSeatEvent(seat); } catch (Exception ex) { paymentProducerCompensation.sendPaymentEvent(seat.getPayment()); } return seatService.findById(seat.getId()); }
In the previous code, we can see how we perform the event update. If there are no errors, we send a message to notify the change. However, if there is an error, we notify Payment to undo the changes.
Finally, in the message consumption part, let’s take a look at the code for the update. For database management, we have used PanacheRepository, and using this library, we update the seat to OCCUPIED.
@ApplicationScoped @RequiredArgsConstructor @Slf4j public class SeatService { private final SeatRepository seatRepository; @Transactional(REQUIRES_NEW) public void updateSeat(Seat seat) { log.info("Block a seat {}", seat.toString()); seatRepository.update("status = 'OCCUPIED' where id = ?1", seat.getId()); } public Seat findById(Long id) { return seatRepository.findById(id); } }
Kafka Producer with Quarkus in Allocate Service
This microservice will have two producers. One will send a notification to Order Service to update the seat status in that service, and the other will perform a compensation of the payment in case of any errors.
The following code snippet will send a message to Order Service to update the seat status in that service. As we have seen before, we will use the Quarkus annotation @Channel, which allows us to provide the Kafka configuration located in the application.properties file of our application.
@ApplicationScoped @Slf4j public class AllocateProducer { @Channel("seats-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendSeatEvent(Seat seat) { log.info("Event sent seat with id{}", seat.getId()); ObjectMapper objectMapper = new ObjectMapper(); var seatJson = objectMapper.writeValueAsString(seat); emitter.send(Record.of(seat.getId(), seatJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
If an error occurs while updating the seat status, a compensation event is created for Payment, as seen in the code snippet for seat status update (AllocateUseCase).
To send this event, we use the payments-out channel to send a message to the topic that Payment is subscribed to as well:
package com.refactorizando.sample.saga.events.compensation; import com.fasterxml.jackson.databind.ObjectMapper; import com.refactorizando.sample.saga.model.Payment; import com.refactorizando.sample.saga.model.Seat; import com.refactorizando.sample.saga.service.PaymentService; import io.smallrye.reactive.messaging.kafka.Record; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.eclipse.microprofile.reactive.messaging.Incoming; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @ApplicationScoped @RequiredArgsConstructor @Slf4j public class PaymentProducerCompensation { @Channel("payments-out") Emitter<Record<Long, String>> emitter; @SneakyThrows public void sendPaymentEvent(Payment payment) { log.info("Event sent {}", payment.getId()); ObjectMapper objectMapper = new ObjectMapper(); var paymentJson = objectMapper.writeValueAsString(payment); emitter.send(Record.of(payment.getId(), paymentJson)) .whenComplete((success, failure) -> { if (failure != null) { log.error("D'oh! " + failure.getMessage()); } else { log.info("Message processed successfully"); } }); } }
Notes and Information about Saga Pattern with Quarkus and Kafka
- You can start a Quarkus project using the @QuarkusMain annotation or generate it from Maven, eliminating the need for IDE launch.
- You can create the projects in the same way since they share the same libraries.
- All services use Kafka consumer with @Incoming annotation for configuration defined in application.properties.
- Services utilize Kafka producer with @Channel annotation for necessary configuration in application.properties file. This enables us to emit an event.
- All projects use Lombok to avoid Java boilerplate code.
- Lombok accomplishes dependency injection through the constructor.
- We have set up Kafka using Docker.
- If you want to test locally, make sure each service is on a different port.
Conclusion
In this post about the Saga Pattern with Quarkus and Kafka, we have seen how to implement and develop an architecture based on transactions from scratch and step by step.
This design pattern ensures accurate transaction processing in microservices when transactions are necessary and 2PC is not viable.
If you need more information, you can leave us a comment or send an email to refactorizando.web@gmail.com You can also contact us through our social media channels on Facebook or twitter and we will be happy to assist you!!