Kafka en Kubernetes con Strimzi
Cada vez es más frecuente el uso de kafka en nuestras arquitecturas, sobre todo en las orientadas a microservicios. En muchas ocasiones, su configuración y su despliegue nos lleva más tiempo o más complicaciones de las deseadas, además si le añadimos desplegarlo en una plataforma como Kubernetes, en la que necesitamos realizar configuraciones como certificados, roles, seguridad etc, ese tiempo puede aumentar todavía más. Es por ese motivo que surgen diferentes iniciativas como esta que vamos a ver en este nuevo artículo sobre Kafka en Kubernetes con Strimzi.
En este artículo vamos a ver como desplegar un clúster de Kafka de una manera sencilla y además poder consumirlo mediante SSL desde fuera del clúster de kubernetes, con las tools de Kafka y con una aplicación en Spring Boot. Todo ello haciendo uso de minikube.
¿Qué es Strimzi?
Strimzi es un proyecto Open Source que ofrece imágenes de contenedores y operadores para ejecutar Apache Kafka en Kubernetes y Red Hat OpenShift. Es decir, nos va a simplificar las tareas de desplegar Kafka en Kubernetes.
Los operadores que nos ofrece Strimzi nos van a ayudar a la gestión de Kafka y a simplificar el proceso de Deploy y de ejecutar clústers de Kafka y sus componentes, así como configuración, seguridad de acceso, gestión de topics usuarios etc…
Instalación de Strimzi con Helm
Una de las maneras más sencillas, por lo general, de realizar instalaciones en Kubernetes es haciendo uso de Helm. Para poder hacer uso de Helm con Strimzi vamos a ejecutar los siguientes comandos desde nuestro minikube:
helm repo add strimzi https://strimzi.io/charts/ helm install strimzi-kafka strimzi/strimzi-kafka-operator
Para borrar la instalación que acabamos de hacer con Helm podemos usar:
helm uninstall strimzi-kafka
Vamos a aseguarnos que la instalación que hemos realizado de Strimzi ha sido correcta, para ello vamos a ver los pods y los replicaset que están corriendo:
kubectl get all -l=name=strimzi-cluster-operator NAME READY STATUS RESTARTS AGE pod/strimzi-cluster-operator-85bb4c6-mfhmj 1/1 Running 0 9s NAME DESIRED CURRENT READY AGE replicaset.apps/strimzi-cluster-operator-85bb4c6 1 1 1 9s
Además con nuestra instalación de Strimzi se crean también Custom Resource Definitions, es decir, objetos que extienden del API de Kubernetes para poder introducir nuestra propia API en el clúster.
Una vez hemos levantado e instalado strimzi es momento de seguir con la instalación propiamente dicho de kafka.
Creación de Kafka Clúster con Strimzi
Con Strimzi instalado y ejecutandose es momento de realizar la instalación de un clúster de Kafka. Gracias a los Custom Resource Definitions que se han instalado (CRD) de Kubernetes vamos a poder definir un objeto propio que insertaremos en nuestro clúster.
Aunque para este artículo nos vamos a centrar en el operador de Kafka, Strimzi nos crea más operadores tales como:
- Cluster Operator: Despliega y gestiona Apache Kafka clusters, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, y el Entity Operator.
- Entity Operator: Para el operador de Topic y el operador de User.
- Topic Operator: Manages Kafka topicsUser Operator
- User Operator: Gestiona los usuarios de Kafka.
Podemos ver los objetos creados de Strimzi con el siguiente comando:
kubectl get crd
El tipo de objeto que vamos a instalar es de tipo Kafka, vamos a crear su fichero de configuración:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster spec: kafka: version: 3.0.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 log.message.format.version: "2.4" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral
Hemos específicado una configuración sencilla para nuestro clúster de kafka. En este fichero podemos ver como por un lado hemos definido zookeper y por otro kafka.
Vamos a ejecutar este fichero para ello hacemos:
kubectl apply -f <nombre-fichero> -n <namespace>
Una vez ejecutado nuestro Kafka clúster, strimzi nos crea diferentes recursos de kubernetes que se necesitan para habilitar Kafka, tales como, statefulset, service, configmap y secret.
Como hemos comentado anteriormente hemos extendido de CRD para crear poder incorporar nuestro propio recurso de Kubernetes, en este caso Kafka, por lo que vamos a ver que nos devuelve este recurso ejecutando:
kubectl get kafka
Podemos ver como con la creación de nuestro operador de kafka se ha desplegado una réplica de Kafka y una de Zookeeper.
Para ello también podemos ver los pods que se han creado:
A continuación vamos a ver la configuración de Zookeeper y de Kafka que se encontraran en los configmap con nombres, kafka-cluster-kafka-config y kafka-cluster-zookeeper-config.
Si queremos ver su configuración podemos hacer uso de los siguientes comandos.
kubectl get configmap kafka-cluster-kafka-config -o yaml kubectl get configmap kafka-cluster-zookeeper-config -o yaml
Nuestro fichero del nuevo recurso de Kafka nos ha creado una serie de servicios para que kafka sea accesible desde dentro de nuestro clúster:
$ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-cluster-kafka-bootstrap ClusterIP 10.103.14.204 <none> 9091/TCP,9092/TCP 5m55s kafka-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 5m55s kafka-cluster-zookeeper-client ClusterIP 10.101.115.90 <none> 2181/TCP 7m28s kafka-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 7m28s
Como podemos ver tenemos el cliente de zookeeper y el bootstrap de Kafka con una IP para que los clientes internos del clúster se puedan conectar a nuestro clúster, y por otro lado tenemos el brokers y nodos sin IP que son el Headless service de los stateful creados. Los Headless service son para aquellos casos en los que no se necesita un Load Balancer y una única IP es decir la IP del statefulset.
Crear Consumidor y Productor con Strimzi dentro del Clúster
Una vez que hemos hecho una configuración para poder hacer uso de Kafka dentro de un clúster de Kubernetes, es momento de hacer una prueba con un consumidor y un productor.
Para poder hacer uso de un productor ejecutamos el siguiente comando:
kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:latest-kafka-3.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic my-topic
Escribe cualquier cosa y ahora desde otra ventana ejecutamos el consumidor y vemos los mensajes:
kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:latest-kafka-3.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
Acceder a Kafka desde el exterior del Clúster con Strimzi
Cuando hacemos uso de la configuración por defecto de Strimzi, nuestro Kafka únicamente va a ser accesible desde dentro de nuestro clúster. Para que sea accesible desde fuera vamos a definir nuestro recurso Kafka como Loadbalancer, de esta manera hacemos nuestra aplicación accesible desde el exterior.
Cuando hacemos una aplicación accesible desde fuera de nuestro clúster es más que recomendable y necesario dotarlo de seguridad como MTLS o TLS.
Vamos a modificar nuestro fichero yaml de definición de Kafka para tener LoadBalancer. Vamos a cambiar el tipo a LoadBalancer y el nombre a external. El puerto al no estar todavía securizado lo dejamos en el 9092.
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster spec: kafka: version: 3.0.0 replicas: 1 listeners: - name: external port: 9092 tls: false type: loadbalancer config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 log.message.format.version: "2.4" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral
Ejecutamos:
kubectl apply -f <nombre-fichero>.yaml -n <namespace>
Y se nos habrá creado un nuevo servicio de tipo LoadBalancer que nos permitirá acceder desde el exterior. Este servicio se llamará kafka-cluster-kafka-plain-bootstrap. Al estar probando con minikube necesitamos crear un túnel, en otro caso la external IP aparecerá como pending:
minikube tunnel
minikube service kafka-cluster-kafka-plain-bootstrap
Probando kafka con LoadBalancer en Strimzi
Una vez hemos añadido un LoadBalancer a nuestro recurso de kafka creado con Strimzi, es momento de probarlo desde fuera de nuestro clúster.
Para ello podemos hacer uso de algún cliente de kafka o podemos utilizar alguna aplicación que se conecte a kafka.
En nuestro ejemplo vamos a hacer uso del productor y consumidor que nos proporciona kafka, para ello lo podemos descargar de aquí.
Para obtener la IP de kafka para que pueda ser consumida desde el exterior podemos hacer uso del siguiente comando:
kubectl get service kafka-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
Vamos a arrancar un productor y un consumidor de la siguiente manera:
# on a terminal, start producer and send a few messages $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9092 --topic $TOPIC_NAME # on another terminal, start consumer $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9092 --topic $TOPIC_NAME --from-beginning
Y podemos ver como productor y consumidor funcionan de manera correcta.
Añadir seguridad a Kafka para el acceso desde el exterior
Como ahora nuestro clúster puede ser accedido desde fuera dotar a nuestra infraestructura de seguridad es de vital importancia, para ello vamos a activar la seguridad de nuestro broker de kafka.
Partiendo de nuestro fichero yaml anterior, en el que hemos definido nuestro recurso kafka para Kubernetes, vamos a cambiar tls a true. De hecho este parámetro por defecto es true.
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster spec: kafka: version: 3.0.0 replicas: 1 listeners: - name: plain port: 9092 type: loadbalancer tls: false - name: external port: 9094 type: loadbalancer tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 log.message.format.version: "2.4" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral
Ahora aplicamos:
kubectl apply -f <fichero>.yaml -n <namespace>
Y veremos como se nos crean todos los recursos necesarios y se nos configura la seguridad por tls.
Para ver como la seguridad de nuestro clúster se ha activado, vamos a analizar el configmap y la configuración de kafka:
kubectl get configmap/kafka-cluster-kafka-config -o yaml
En la configuración ahora vemos que nuestros listeners siguen teniendo acceso por el puerto 9092, pero a parte también se ha habilitado un nuevo puerto el 9094 que será el puerto para SSL:
listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,EXTERNAL-9094://0.0.0.0:9094 listener.security.protocol.map=REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,EXTERNAL-9094:SSL
Probando kafka desde fuera del clúster con conexión TLS
Una vez que hemos configurado nuestro broker, vamos a querer acceder a el, para ello vamos a necesitar un certificado y su password. Strimzi nos ha generado estos valores, y los ha guardado en nuestro secret.
A continuación exportarmos el CA y la password para poder conectarnos a nuestro broker que se encuentra en dos secrets diferentes (para ver todos los secrets podemos hacer kubectl get secrets):
kubectl get secret kafka-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt kubectl get secret kafka-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
Vamos a guardar el certificado en un trustore de confianza, para poder hacer uso de los clientes Java, si quieres saber más sobre certificados en Java, echa un vistazo aquí. Por lo que vamos a crear un keystore nuevo.
keytool -genkey -alias kafka -keyalg RSA -keystore kafka.jks -keysize 2048 #Se recomienda migrar a PKCS12 por lo que habría que aplicar la siguiente línea o generar directamente PKCS12 keytool -importkeystore -srckeystore kafka.jks -destkeystore kafka.jks -deststoretype pkcs12 export CERT_FILE_PATH=ca.crt export CERT_PASSWORD_FILE_PATH=ca.password export KEYSTORE_CLIENT_LOCATION=kafka.jks export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH` export CA_CERT_ALIAS=strimzi-kafka-cert sudo keytool -importcert -alias $CA_CERT_ALIAS -file $CERT_FILE_PATH -keystore $KEYSTORE_CLIENT_LOCATION -keypass $PASSWORD sudo keytool -list -alias $CA_CERT_ALIAS -keystore $KEYSTORE_CLIENT_LOCATION
Como vamos a conectarnos a Kafka desde fuera de nuestro clúster es necesario conocer la IP que nos proporciona nuestra configuración con Load Balancer. Para obtener la IP podemos ejecutar el siguiente comando:
kubectl get service/kafka-cluster-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
A continuación tenemos que crear un fichero ssl (con nombre por ejemplo SSL.properties) para indicar nuestros certificados y nuestra IP del Load Balancer.
bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094 security.protocol=SSL ssl.truststore.location=[TRUSTSTORE_LOCATION] ssl.truststore.password=changeit
Una vez hemos creado este fichero de SSL.properties, se lo vamos a indicar tanto al consumidor de kafka como productor. Para ello ejecutamos en diferentes terminales:
#Productor kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties #Consumidor kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning
Al ejecutar los comandos anteriores, en diferentes terminales podremos ir enviando mensajes desde el terminal en el que hemos ejecutado el productor y los iremos viendo en el terminal en el que se está ejecutando el consumidor.
Probando Kafka con Strimzi en Spring Boot
Una vez hemos desplegado Kafka en nuestro clúster de Kubernetes con seguridad y además para que sea accedido desde fuera haciendo uso de un Load Balancer, vamos a conectarlo con Spring Boot.
Tal y como vimos en este artículo, Spring Boot nos ofrece muchas facilidades a través de su autoconfiguración para conectarnos a Kafka. Por lo que partiendo del ejemplo de ese artículo que podemos encontrar en github, vamos a añadir un nuevo application.yml en el que añadimos conexión por SSL haciendo uso del certificado que anteriormente hemos generado:
git clone https://github.com/refactorizando-web/kafka-spring-boot.git
kafka.topic.name: test-strimzi-topic spring: kafka: security: protocol: "SSL" bootstrap-servers: 127.0.0.1:9094 ssl: trust-store-location: classpath:/kafka.jks trust-store-password: changeit consumer: group-id: demo-group-id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Arrancamos nuestra aplicación Spring Boot con este perfil y con el certificado almacenado en la carpeta resources y veremos como se conecta a kafka.
Conclusión
En esta entrada sobre Kafka en Kubernetes con Strimzi, hemos visto las facilidades que nos ofrece Strimzi para poder configurar un kafka en un clúster de Kubernetes. Obviamente esto ha sido una breve introducción y hay muchas más posiblidades, creación de usuario, diferentes tipos de autenticación, etc…
Si necesitas más información puedes escribirnos un comentario o un correo electrónico a refactorizando.web@gmail.com o también nos puedes contactar por nuestras redes sociales Facebook o twitter y te ayudaremos encantados!