Kafka Stream

Kafka Stream


En un artículo anterior vimos el uso de Kafka Stream con un Schema Registry haciendo uso de Avro, en esta nueva entrada vamos a ver el funcionamiento de KStream en Kafka Stream con Spring Boot.

Kafka Streams nos ofrece una librería para el lado cliente para poder trabajar y gestionar los streams. Muchas de las arquitecturas que actualmente se usan hacen uso de esta librería para poder trabajar con Streams de Datos. Por ejemplo un uso extendido puede ser para realizar migraciones de Bases de Datos, haciendo uso de Change Data Capture (CDC), para el procesamiento en tiempo real de datos.

Podríamos decir que Kafka Streams nos ofrece una aproximación entre eventos y tablas como si de una base de datos relacional se tratará. Nos va a permitir hacer operaciones como join, aggregation, filter, group de los eventos que se van a procesar.

Processor Topology

Uno de los conceptos más importantes dentro de Kafka Streams es el concepto de Processor Topology. El Proccessor Topology o la topología del procesador es el modelo de las operaciones de Kafka Stream en uno o más flujos de eventos.

El Processor Topology es considerado como un grafo acíclico. En estos grafos los nodos son el source, el processor y el sink, mientras que las aristas son el flujo de los eventos que ocurren. Esencialmente, la topología del procesador se puede considerar como un gráfico acíclico dirigido. En este gráfico, los nodos se clasifican en source, processor y sink, mientras que los bordes representan el flujo de los eventos de flujo.

El nodo source, en la parte superior es el encargado de recibir los datos provenientes de Kafka, se los envía al nodo processor, y a continuación va a los nodos sink para acabar en un nuevo topic de Kafka. A lo largo del proceso el estado del stream es guardado periódicamente a través de check points para mantener la resiliencia y la tolerancia a fallos.

¿Qué es KStream?

Como ya hablamos en un artículo anterior sobre Kafka Stream, podríamos decir que KStream es una abstracción de un registro de un stream. En el que cada registro de datos, representa un dato independiente en todo el conjunto de datos ilimitados.

Ejemplo de KTable con Spring Cloud Stream

A continuación vamos a diseñar un ejemplo en el que podamos ver un flujo completo de Kafka Stream haciendo uso de KStream.

Nuestro ejemplo va a consistir en leer un stream de números de un topic de Kafka. A continuación y tras ser leído procesamos ese stream saber si el número es par o impar y lo almacenamos haciendo uso de KeyValueBytesStoreSupplier.

Por lo que nuestro ejemplo va a constar de un consumidor y un productor, en el que estableceremos diferentes configuraciones.

Vamos a comenzar viendo las configuraciones de nuestro Productor y Consumidor. En nuestro ejemplo hemos hecho uso de Java 17 pero con versiones anteriores de Java también funcionará.

Si quieres echar un ojo directamente al ejemplo lo puedes hacer aquí.

Configuración Maven de Productor de Kafka Stream con Spring Cloud Stream

Lo primero que vamos a hacer es añadir las dependencias Maven imprescindibles para el productor:

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-stream-dependencies</artifactId>
				<version>${spring-cloud-stream.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

Con la dependencia de spring-cloud-starter-stream-kafka, vamos a añadir la autoconfiguración de Spring para poder producir streams de Kafka. Por otro lado para la serialización de datos vamos a hacer uso de Jackson.

Además tendremos que tener en cuenta el añadir como dependency management, las dependencias como pom de spring cloud stream.

Configuración de las propiedades de un Productor de Kafka Stream con Spring Cloud Stream

Vamos a hacer uso de programación funcional por lo que en nuestro fichero de propiedades además tendremos que realizar alguna configuración para indicar donde se encuentran nuestros métodos.

spring:
  application:
    name: send-number
  cloud:
    stream:
      function:
        definition: sendNumber;
      bindings:
        sendNumber-out-0.destination: number
      kafka:
        bindings.sendNumber-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer
        bootstrap-servers: localhost:9092
  • bootstrap-servers: Indica la dirección de kafka
  • function.definition: Indica donde se encuentra el método supplier encargado de emitir eventos.
  • bindings: Esta propiedad tendrá que llevar un nombre acompañada de out-0.destination, e indica el topic.
  • serializer: El serializador que se usará en este caso usamos kafka.common.serialization.

Creador de un Productor con con Spring Cloud Stream

Para crear nuestro productor con Kafka Stream y Spring Cloud Stream vamos a hacer uso de programación funcional, por lo que nuestro productor hará uso de la Interfaz Supplier para enviar los eventos. El nombre del método encargado de enviar los eventos tiene que ser definido en nuestro application.yml en la propiedad spring.cloud.stream.function.definition; y además también tendrá el nombre del topic.

@Slf4j
@Configuration
public class NumberProducer {

  Random r = new Random();
  int lowerBound = 1;
  int upperBound = 100000000;
  
  private static long id = 0;
  LinkedList<Number> numbers = new LinkedList<>
      (List.of(new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound),
          new Number(++id, r.nextInt(upperBound - lowerBound) + lowerBound)
      ));


  @Bean
  public Supplier<Message<Number>> sendNumber() {
    return () -> {
      if (numbers.peek() != null) {
        log.info("Number to send {} ", numbers.peek().getNumber());
        Message<Number> listNumber = MessageBuilder
            .withPayload(numbers.peek())
            .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(numbers.poll()).getId())
            .build();
        log.info("Order event sent with number: {}", listNumber.toString());
        return listNumber;
      } else {
        return null;
      }
    };
  }

}

Nuestro productor simplemente se encargará de enviar un evento con un número aleatorio definido en la clase Number.

Configuración Maven de Consumidor de Kafka Stream con Spring Cloud Stream

Vamos a comenzar añadiendo las dependencias Maven inprescindibles para nuestro consumidor:

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-stream-dependencies</artifactId>
				<version>${spring-cloud-stream.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

La primera dependencia será necesaria para añadir el binder de kafka stream.

Configuración de las propiedades de un Productor de Kafka Stream con Spring Cloud Stream

spring:
  application:
    name: number-consumer
  cloud:
    stream:
      function:
        definition: total
      bindings:
        total-in-0:
          destination: numbers
      kafka:
        bootstrap-servers: localhost:9092
        default:
          requiredAcks: all

Las propiedades a destacar en el consumidor serían:

  • spring.cloud.stream.function.definition: Indica el nombre del método del consumidor
  • spring.cloud.stream.bindings.total-in-0.destination: Indica el nombre del topic de kafka
  • spring.cloud.stream.kafka.bootstra-servers: Indica el host kafka

Creador de un consumidor de Kafka Stream con Spring Cloud Stream

Nuestro consumidor se va a encargar de procesar los mensajes que llegan y analizar si es par o impar. Para ello haremos uso de KStream para poder agrupar los eventos por el número que llega en el payload y mediante una agregación transformarlo para guardar los valores en un nuevo objeto y poder guardarlo mediante un keyValueBytesStoreSupplier. Es decir podemos guardar una clave valor en la que podremos hacer queries y recuperar los valores que necesitemos

@Bean
  public Consumer<KStream<Long, Num>> total() {
    KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
        "all-numbers-store");
    return transactions -> transactions
        .groupBy((k, v) -> v.getNumber(),
            Grouped.with(Serdes.Integer(), new JsonSerde<>(Num.class)))
        .aggregate(
            Total::new,
            (k, v, a) -> {
              a.setTotalEven((v.getNumber() % 2 == 0) ? a.getTotalEven() : a.getTotalOdd() + 1);
              a.setTotalOdd((v.getNumber() % 2 == 0) ? a.getTotalOdd() + 1 : a.getTotalOdd());
              return a;
            },
            Materialized.<Integer, Total>as(storeSupplier)
                .withKeySerde(Serdes.Integer())
                .withValueSerde(new JsonSerde<>(Total.class)))
        .toStream()
        .peek((k, v) -> log.info("Number value is {} ", v));


  }

Como podemos ver en el método consumidor hacemos uso de KeyValueBytesStoreSupplier para guardar y a continuación poder materializar el valor. Además hacemos uso de Serdes para la deserialización del objeto que llega.

Verificando el ejemplo

Para poder verificar nuestro ejemplo hemos creado un controlador y haciendo uso de la clase InteractiveQueryService, vamos a poder visualizar los datos materializados. Para ello vamos a hacer una query en la que se nos devuelve toda la información:

  @GetMapping("/all")
  public KeyValueIterator<Integer, Total> getAllTransactionsSummary() {
    ReadOnlyKeyValueStore<Integer, Total> keyValueStore =
        queryService.getQueryableStore("all-numbers-store",
            QueryableStoreTypes.keyValueStore());
    return keyValueStore.all();
  }

Si invocamos a localhost:8088/numbers/all, podremos ver los eventos guardados.

Conclusión

En esta entrada sobre KStream en Kafka Stream con Spring Boot, hemos visto una introducción mediante un ejemplo de procesamiento de eventos con Kafka Stream, haciendo uso de KStream. El API de Kafka Stream nos ofrece muchísimas más posibilidades así como diversas opciones, y en este artículo hemos visto una simple introducción.

Si quieres descargarte el ejemplo lo puedes hacer de aquí.

Si necesitas más información puedes escribirnos un comentario o un correo electrónico a refactorizando.web@gmail.com o también nos puedes contactar por nuestras redes sociales Facebook o twitter y te ayudaremos encantados!


Deja una respuesta

Tu dirección de correo electrónico no será publicada.