Kafka en Kubernetes con Strimzi


Cada vez es más frecuente el uso de kafka en nuestras arquitecturas, sobre todo en las orientadas a microservicios. En muchas ocasiones, su configuración y su despliegue nos lleva más tiempo o más complicaciones de las deseadas, además si le añadimos desplegarlo en una plataforma como Kubernetes, en la que necesitamos realizar configuraciones como certificados, roles, seguridad etc, ese tiempo puede aumentar todavía más. Es por ese motivo que surgen diferentes iniciativas como esta que vamos a ver en este nuevo artículo sobre Kafka en Kubernetes con Strimzi.

En este artículo vamos a ver como desplegar un clúster de Kafka de una manera sencilla y además poder consumirlo mediante SSL desde fuera del clúster de kubernetes, con las tools de Kafka y con una aplicación en Spring Boot. Todo ello haciendo uso de minikube.

¿Qué es Strimzi?

Strimzi es un proyecto Open Source que ofrece imágenes de contenedores y operadores para ejecutar Apache Kafka en Kubernetes y Red Hat OpenShift. Es decir, nos va a simplificar las tareas de desplegar Kafka en Kubernetes.

Los operadores que nos ofrece Strimzi nos van a ayudar a la gestión de Kafka y a simplificar el proceso de Deploy y de ejecutar clústers de Kafka y sus componentes, así como configuración, seguridad de acceso, gestión de topics usuarios etc…

Instalación de Strimzi con Helm

Una de las maneras más sencillas, por lo general, de realizar instalaciones en Kubernetes es haciendo uso de Helm. Para poder hacer uso de Helm con Strimzi vamos a ejecutar los siguientes comandos desde nuestro minikube:

helm repo add strimzi https://strimzi.io/charts/
helm install strimzi-kafka strimzi/strimzi-kafka-operator

Para borrar la instalación que acabamos de hacer con Helm podemos usar:

helm uninstall strimzi-kafka

Vamos a aseguarnos que la instalación que hemos realizado de Strimzi ha sido correcta, para ello vamos a ver los pods y los replicaset que están corriendo:

 kubectl get all -l=name=strimzi-cluster-operator

NAME                                         READY   STATUS              RESTARTS   AGE
pod/strimzi-cluster-operator-85bb4c6-mfhmj   1/1     Running              0          9s

NAME                                               DESIRED   CURRENT   READY   AGE
replicaset.apps/strimzi-cluster-operator-85bb4c6   1         1         1       9s

Además con nuestra instalación de Strimzi se crean también Custom Resource Definitions, es decir, objetos que extienden del API de Kubernetes para poder introducir nuestra propia API en el clúster.

Una vez hemos levantado e instalado strimzi es momento de seguir con la instalación propiamente dicho de kafka.

Creación de Kafka Clúster con Strimzi

Con Strimzi instalado y ejecutandose es momento de realizar la instalación de un clúster de Kafka. Gracias a los Custom Resource Definitions que se han instalado (CRD) de Kubernetes vamos a poder definir un objeto propio que insertaremos en nuestro clúster.

El tipo de objeto que vamos a instalar es de tipo Kafka, vamos a crear su fichero de configuración:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
spec:
  kafka:
    version: 3.0.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral

Hemos específicado una configuración sencilla para nuestro clúster de kafka. En este fichero podemos ver como por un lado hemos definido zookeper y por otro kafka.

Vamos a ejecutar este fichero para ello hacemos:

kubectl apply -f <nombre-fichero> -n <namespace>

Una vez ejecutado nuestro Kafka clúster, strimzi nos crea diferentes recursos de kubernetes que se necesitan para habilitar Kafka, tales como, statefulset, service, configmap y secret.

Como hemos comentado anteriormente hemos extendido de CRD para crear poder incorporar nuestro propio recurso de Kubernetes, en este caso Kafka, por lo que vamos a ver que nos devuelve este recurso ejecutando:

kubectl get kafka
Kafka Operador en Strimzi | Kafka en Kubernetes con Strimzi
Kafka Operador en Strimzi

Podemos ver como con la creación de nuestro operador de kafka se ha desplegado una réplica de Kafka y una de Zookeeper.

A continuación vamos a ver la configuración de Zookeeper y de Kafka que se encontraran en los configmap con nombres, kafka-cluster-kafka-config y kafka-cluster-zookeeper-config.

Si queremos ver su configuración podemos hacer uso de los siguientes comandos.

kubectl get configmap kafka-cluster-kafka-config -o yaml
kubectl get configmap  kafka-cluster-zookeeper-config -o yaml

Nuestro fichero del nuevo recurso de Kafka nos ha creado una serie de servicios para que kafka sea accesible desde dentro de nuestro clúster:

$ kubectl get svc
NAME                             TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kafka-cluster-kafka-bootstrap    ClusterIP   10.103.14.204    <none>        9091/TCP,9092/TCP            5m55s
kafka-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP   5m55s
kafka-cluster-zookeeper-client   ClusterIP   10.101.115.90    <none>        2181/TCP                     7m28s
kafka-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   7m28s

Como podemos ver tenemos el cliente de zookeeper y el bootstrap de Kafka con una IP para que los clientes internos del clúster se puedan conectar a nuestro clúster, y por otro lado tenemos el brokers y nodos sin IP que son el Headless service de los stateful creados. Los Headless service son para aquellos casos en los que no se necesita un Load Balancer y una única IP es decir la IP del statefulset.

Crear Consumidor y Productor con Strimzi dentro del Clúster

Una vez que hemos hecho una configuración para poder hacer uso de Kafka dentro de un clúster de Kubernetes, es momento de hacer una prueba con un consumidor y un productor.

Para poder hacer uso de un productor ejecutamos el siguiente comando:

kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:latest-kafka-3.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic my-topic

Escribe cualquier cosa y ahora desde otra ventana ejecutamos el consumidor y vemos los mensajes:

kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:latest-kafka-3.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

Acceder a Kafka desde el exterior del Clúster con Strimzi

Cuando hacemos uso de la configuración por defecto de Strimzi, nuestro Kafka únicamente va a ser accesible desde dentro de nuestro clúster. Para que sea accesible desde fuera vamos a definir nuestro recurso Kafka como Loadbalancer, de esta manera hacemos nuestra aplicación accesible desde el exterior.

Cuando hacemos una aplicación accesible desde fuera de nuestro clúster es más que recomendable y necesario dotarlo de seguridad como MTLS o TLS.

Vamos a modificar nuestro fichero yaml de definición de Kafka para tener LoadBalancer. Vamos a cambiar el tipo a LoadBalancer y el nombre a external. El puerto al no estar todavía securizado lo dejamos en el 9092.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
spec:
  kafka:
    version: 3.0.0
    replicas: 1
    listeners:
      - name: external
        port: 9092
        tls:  false
        type: loadbalancer
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral

Ejecutamos:

kubectl apply -f <nombre-fichero>.yaml -n <namespace>

Y se nos habrá creado un nuevo servicio de tipo LoadBalancer que nos permitirá acceder desde el exterior. Este servicio se llamará kafka-cluster-kafka-plain-bootstrap. Al estar probando con minikube necesitamos crear un túnel, en otro caso la external IP aparecerá como pending:

minikube tunnel
minikube service kafka-cluster-kafka-plain-bootstrap

Probando kafka con LoadBalancer en Strimzi

Una vez hemos añadido un LoadBalancer a nuestro recurso de kafka creado con Strimzi, es momento de probarlo desde fuera de nuestro clúster.

Para ello podemos hacer uso de algún cliente de kafka o podemos utilizar alguna aplicación que se conecte a kafka.

En nuestro ejemplo vamos a hacer uso del productor y consumidor que nos proporciona kafka, para ello lo podemos descargar de aquí.

Vamos a arrancar un productor y un consumidor de la siguiente manera:

# on a terminal, start producer and send a few messages
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties
# on another terminal, start consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning

Y podemos ver como productor y consumidor funcionan de manera correcta.

Añadir seguridad a Kafka para el acceso desde el exterior

Como ahora nuestro clúster puede ser accedido desde fuera dotar a nuestra infraestructura de seguridad es de vital importancia, para ello vamos a activar la seguridad de nuestro broker de kafka.

Partiendo de nuestro fichero yaml anterior, en el que hemos definido nuestro recurso kafka para Kubernetes, vamos a cambiar tls a true. De hecho este parámetro por defecto es true.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
spec:
  kafka:
    version: 3.0.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: loadbalancer
        tls: false
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral

Ahora aplicamos:

kubectl apply -f <fichero>.yaml -n <namespace>

Y veremos como se nos crean todos los recursos necesarios y se nos configura la seguridad por tls.

Para ver como la seguridad de nuestro clúster se ha activado, vamos a analizar el configmap y la configuración de kafka:

kubectl get configmap/kafka-cluster-kafka-config -o yaml

En la configuración ahora vemos que nuestros listeners siguen teniendo acceso por el puerto 9092, pero a parte también se ha habilitado un nuevo puerto el 9094 que será el puerto para SSL:

listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,EXTERNAL-9094://0.0.0.0:9094

listener.security.protocol.map=REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,EXTERNAL-9094:SSL

Probando kafka desde fuera del clúster con conexión TLS

Una vez que hemos configurado nuestro broker, vamos a querer acceder a el, para ello vamos a necesitar un certificado y su password. Strimzi nos ha generado estos valores, y los ha guardado en nuestro secret.

A continuación exportarmos el CA y la password para poder conectarnos a nuestro broker que se encuentra en dos secrets diferentes (para ver todos los secrets podemos hacer kubectl get secrets):

kubectl get secret kafka-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
kubectl get secret kafka-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password

Vamos a guardar el certificado en un trustore de confianza, para poder hacer uso de los clientes Java, si quieres saber más sobre certificados en Java, echa un vistazo aquí. Por lo que vamos a crear un keystore nuevo.

keytool -genkey -alias kafka -keyalg RSA -keystore kafka.jks -keysize 2048
#Se recomienda migrar a PKCS12 por lo que habría que aplicar la siguiente línea o generar directamente PKCS12
keytool -importkeystore -srckeystore kafka.jks -destkeystore kafka.jks -deststoretype pkcs12  

export CERT_FILE_PATH=ca.crt
export CERT_PASSWORD_FILE_PATH=ca.password
export KEYSTORE_LOCATION=kafka.jks
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
export CA_CERT_ALIAS=strimzi-kafka-cert

sudo keytool -importcert -alias $CA_CERT_ALIAS -file $CERT_FILE_PATH -keystore $KEYSTORE_CLIENT_LOCATION -keypass $PASSWORD
sudo keytool -list -alias $CA_CERT_ALIAS -keystore $KEYSTORE_LOCATION

Como vamos a conectarnos a Kafka desde fuera de nuestro clúster es necesario conocer la IP que nos proporciona nuestra configuración con Load Balancer. Para obtener la IP podemos ejecutar el siguiente comando:

kubectl get service/kafka-cluster-kafka-plain-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}

A continuación tenemos que crear un fichero ssl para indicar nuestros certificados y nuestra IP del Load Balancer.

bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
security.protocol=SSL
ssl.truststore.location=[TRUSTSTORE_LOCATION]
ssl.truststore.password=changeit

Una vez hemos creado este fichero de SSL.properties, se lo vamos a indicar tanto al consumidor de kafka como productor. Para ello ejecutamos en diferentes terminales:

#Productor
kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties

#Consumidor
kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning

Al ejecutar los comandos anteriores, en diferentes terminales podremos ir enviando mensajes desde el terminal en el que hemos ejecutado el productor y los iremos viendo en el terminal en el que se está ejecutando el consumidor.

Probando Kafka con Strimzi en Spring Boot

Una vez hemos desplegado Kafka en nuestro clúster de Kubernetes con seguridad y además para que sea accedido desde fuera haciendo uso de un Load Balancer, vamos a conectarlo con Spring Boot.

Tal y como vimos en este artículo, Spring Boot nos ofrece muchas facilidades a través de su autoconfiguración para conectarnos a Kafka. Por lo que partiendo del ejemplo de ese artículo que podemos encontrar en github, vamos a añadir un nuevo application.yml en el que añadimos conexión por SSL haciendo uso del certificado que anteriormente hemos generado:

git clone https://github.com/refactorizando-web/kafka-spring-boot.git
kafka.topic.name: test-strimzi-topic


spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: 127.0.0.1:9094
    ssl:
      trust-store-location: classpath:/kafka.jks
      trust-store-password: changeit
    consumer:
      group-id: demo-group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Arrancamos nuestra aplicación Spring Boot con este perfil y con el certificado almacenado en la carpeta resources y veremos como se conecta a kafka.

Conclusión

En esta entrada sobre Kafka en Kubernetes con Strimzi, hemos visto las facilidades que nos ofrece Strimzi para poder configurar un kafka en un clúster de Kubernetes. Obviamente esto ha sido una breve introducción y hay muchas más posiblidades, creación de usuario, diferentes tipos de autenticación, etc…

Si necesitas más información puedes escribirnos un comentario o un correo electrónico a refactorizando.web@gmail.com o también nos puedes contactar por nuestras redes sociales Facebook o twitter y te ayudaremos encantados!



Deja una respuesta

Tu dirección de correo electrónico no será publicada.