--- title: 'Kafka' --- Instalación y configuración de Kafka === # Kafka Apache Kafka es una plataforma distribuida de transmisión de eventos que permite publicar, almacenar y procesar flujos de registros, y suscribirse a ellos, en tiempo real con un alto rendimiento y baja latencia. Está diseñada para manejar flujos de datos de varias fuentes y distribuirlos a los diversos usuarios. Consta de servidores y clientes que se comunican a través de un protocolo de red TCP de alto rendimiento y puede ser implementado en hardware bare-metal, máquinas virtuales y contenedores en entornos locales y en la nube. Apache Kafka es una solución open source con licencia Apache 2.0 con aplicaciones para diversas necesidades empresariales e industriales [1-3]. ## Repositorio ``` shell= $ https://gitlab.siare.gov.py/siare-develop/gestion-entornos.git ``` > Proyecto que contiene las configuraciones de los servicios para los diferentes entornos. ## Pasos de Instalación ### Copia local del repositorio Ejecutar el siguiente comando para clonar el repositorio ``` shell= $ git clone https://gitlab.siare.gov.py/siare-develop/gestion-entornos.git ``` El contenido del docker-compose.yml se describe a continuación: ``` shell= version: '2.1' services: zookeeper: image: confluentinc/cp-zookeeper:7.1.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zookeeper:2888:3888 kafka1: image: confluentinc/cp-kafka:7.1.1 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "9999:9999" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://192.168.169.104:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: 192.168.169.104 KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zookeeper ``` Acceder al directorio del repositorio `./gestion-entornos/kafka/single-node` y ejecutar el siguiente comando: ``` shell= $ docker-compose up -d ``` > El comando inicia los contenedores y lo ejecuta en segundo plano Para verificar la correcta ejecución de los contenedores, se debe ejecutar el siguiente comando: ``` shell= $ docker-compose ps ``` ### Asignación de volumen externo La asignación de volumen externo permite personalizar la ubicación de los datos generados por el kafka. #### Creación de volumen Se debe de identificar la ubicación de referencia para el volumen ejemplo: `/opt/kafka/data` Se debe de actualizar la configuración inicial del docker compose. Es necesario agregar la siguiente configuración: ``` volumes: kafka-data: driver: local driver_opts: type: none device: /opt/kafka/data o: bind ``` y asignar el volumen creado al contenedor a ser creado por el compose: ``` volumes: - kafka-data:/var/lib/kafka/data ``` Con estos dos cambios de configuración ya es posible personalizar la ubicación de los datos generados por el kafka. Archivo final con las modificaciones: ``` version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.1.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zookeeper:2888:3888 kafka1: image: confluentinc/cp-kafka:7.1.1 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "9999:9999" volumes: - kafka-data:/var/lib/kafka/data environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://192.168.169.111:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: 192.168.169.111 KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zookeeper kafka-ui: container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: - 8080:8080 environment: KAFKA_CLUSTERS_0_NAME: "lab_siare" KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:9092" depends_on: - kafka1 volumes: kafka-data: driver: local driver_opts: type: none device: /opt/kafka/data o: bind ``` # Uso de Kafka con Spring El proyecto Spring para Apache Kafka (spring-kafka) aplica los conceptos básicos de Spring al desarrollo de soluciones de mensajería basadas en Kafka. Proporciona una "plantilla" como abstracción de alto nivel para el envío de mensajes. ## Dependencia Maven ```xml= <dependency> <groupId>org.springframework.kafka</groupId <artifactId>spring-kafka</artifactId> </dependency> ``` ## Configuración Java En necesario la creación de una clase para la configuración del Kafka. ### KafkaConfig Crear la clase KafkaConfig dentro del proyecto java con el siguiente contenido: ```java= import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import lombok.extern.slf4j.Slf4j; @Configuration @Slf4j @EnableKafka public class KafkaConfig { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(configs()); } @Bean public Map<String, Object> configs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.169.104:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); log.info("realizando configuracion de propiedades kafka"); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } ``` > La clase tiene la responsabilidad de crear los objetos necesarios para la conexión con el servidor Kafka. * @Configuration : Anotación que indica al spring que la clase será utilizada como fuente de definición de Bean para el contexto de la aplicación * @Slf4j : Anotación de la librería *lombok*, que permite crear logs * EnableKafka : Anotación que permite la detención de los beans anotados con @KafkaListener y que son gestionados por el Spring * @Bean : Anotación que nos permite generar Objetos en forma personalizada dentro del contexto de la aplicación. ### Uso del Kafka Para la utilización del Kafka se ha generado los siguientes archivos : ### SiareKafka Anotación que permite identificar donde será requerido el acceso al servidor de Kafka ```java= import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SiareKafka { } ``` * @Target : Indica donde será aplicado la anotación, en este caso será utilizado a nivel de método unicamente (ElementType.Method). * @Retention : Indica si la anotación estará disponible para el JVM en tiempo de ejecución. ### SiareKafkaAspect Clase de Aspecto que intercepta las invocaciones de métodos de servicio, todas las clases de Aspecto debe llevar la anotación **@Aspect** ```java= import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; @Aspect @Component @AllArgsConstructor public class SiareKafkaAspect { // Definición de atributos para la gestión de objetos del server de kafka private KafkaTemplate<String, String> kafkaTemplate; private ObjectMapper objectMapper; @Around("@annotation(SiareKafka)") public Object cache(ProceedingJoinPoint pjp) throws Throwable { //Invocación del método original para obtener el objeto solicitado Object objetoStream = pjp.proceed(); String nuevoObjeto = objectMapper.writeValueAsString(objetoStream); kafkaTemplate.send(pjp.getTarget().getClass().getSimpleName(), nuevoObjeto); return objetoStream; } } ``` * @Component : Anotación que indica al spring que la clase será un Bean gestionado dentro del contexto de la aplicación * @AllArgsConstructor : Anotación de la librería *lombok*, que permite generar un constructor con todos los atributos de la clase. ### Ejemplo de utilización En las implementaciones de los servicios agregamos la anotacion **@SiareKafka** ```java= @SiareKafka @Override public ResponseDto<List<MetodosDto>> obtenerRegistrosPaginadoOrdenado( String filtros, int page, int cantidad, String orderBy, String orderDir) { return super.obtenerRegistrosPaginadoOrdenado( filtros, page, cantidad, orderBy, orderDir); } ``` ### Configuraciones Utilizando Siare-Common Para mejorar el desarrollo se ha modificado la libreria siare-common, la cual incorpora la configuracion de kafka y las variables necesarias para configurar de manera simple. Las variables que se deben configurar en el archivo *application.properties* son las siguientes: - _siare.kafka.enabled_, esta variable determina si es que en el microservicio se da o no soporte a kafka, por omision esta se encuentra en falso, para poder habilitar la incorporacion se debe agregar en verdadero (true). - _kafka.brokers_, esta variable determina la ruta de los brokers de kafka, por ejemplo, kafka-broker1:9200 - _kafka.user_, nombre de usuario asociado a kafka, si no se utiliza usuario y password, se deben agregar de igual manera en vacio. - _kafka.password_, clave asociada al usuario declarado anteriormente. - _kafka.groupid_, grupo con el cual se registrara el microservicio. - _kafka.consumer.concurrency_, cantidad de consumers que se se generaran, se recomienda analizar de acuerdo al tipo de microservicio. - _kafka.siare.consumer.max-poll-records_, esta variable permite determinarla cantidad de registros maximos por cada consumer, su configuracion es opcional. Por default se encuentra en 500 mensajes por consumer. - _kafka.siare.consumer.fetch-max-wait-ms_, cantidad maxima en milisegundos que se espera la ejecucion de un pool, por definicion son 30 segundos, si las tareas son de alta carga y requieren de mas tiempo se sugiere modificar el tiempo. - _kafka.siare.consumer.heartbeat-interval-ms_, tiempo entre intervalos de latido del consumer contra el brokers, por edicion son 3 segundos, se debe modificar si la latencia de conexion del microservicio es alta. - _kafka.siare.consumer.session-timeout-ms_, timeout del consumer en caso de problemas. Se debe tener en cuenta que si un consumer se desconecta del broker, este automaticamente reintenta la conexion a menos que el broker haga un kick-out, en donde el consumer no se reconectara y se debera reinciar el microservicio. Ejemplo de configuracion: ```bash= siare.kafka.enabled = true kafka.brokers = kafka-broker-1:9200;kafka-broker-2:9200;kafka-broker-3:9200 kafka.user = kafka.password = kafka.groupid = foo_microservice ``` # Comandos de streams Para poder ver el listado de streams que se encuentran actualmente en kafka ```bash bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ``` A continuacion se detalla como se deben configurar los streams, indicando la retencion asociada a cada uno de los mismos. ```bash bin/kafka-topics.sh --zookeeper "kafka-node-1:2181" --alter --topic auditoria --config retention.ms=86400000 ``` Este comando se utilizada para indicar que se esta haciendo una modificacion sobre el topico de auditoria y que la retencion de los mensajes sera de 2,4 horas (o 144 minutos). # Streams y configuracion A continuacion se muestran los streams y la configuracion sugerida a los mismos, cabe destacar que los stream/topicos se iran agregando al sistema por lo que la documentacion de los mismos y configuracion debe ser actualizada en este documento. |Topico|Retencion| |------|---------| |auditoria|86.400.000 (1 dia)| |indexar-almacenar|86.400.000 (1 dia)| |restablece-contrasena| 43.200.000 (12 horas)| # Referencias [1] Wu, H., Shang, Z., & Wolter, K. (2019, August). Performance prediction for the apache kafka messaging system. In 2019 IEEE 21st International Conference on High Performance Computing and Communications; IEEE 17th International Conference on Smart City; IEEE 5th International Conference on Data Science and Systems (HPCC/SmartCity/DSS) (pp. 154-161). IEEE. [2] Díaz, M., Martín, C., & Rubio, B. (2016). State-of-the-art, challenges, and open issues in the integration of Internet of things and cloud computing. Journal of Network and Computer applications, 67, 99-117. Doi: https://doi.org/10.1016/j.jnca.2016.01.010 [3] Integración. ¿Qué es Apache Kafka?. Recuperado 20 de agosto de 2021, de https://www.redhat.com/es/topics/integration/what-is-apache-kafka