---
title: 'Kafka'
---
Instalación y configuración de Kafka
===
# Kafka
Apache Kafka es una plataforma distribuida de transmisión de eventos que permite publicar, almacenar y procesar flujos de registros, y suscribirse a ellos, en tiempo real con un alto rendimiento y baja latencia. Está diseñada para manejar flujos de datos de varias fuentes y distribuirlos a los diversos usuarios. Consta de servidores y clientes que se comunican a través de un protocolo de red TCP de alto rendimiento y puede ser implementado en hardware bare-metal, máquinas virtuales y contenedores en entornos locales y en la nube. Apache Kafka es una solución open source con licencia Apache 2.0 con aplicaciones para diversas necesidades empresariales e industriales [1-3].
## Repositorio
``` shell=
$ https://gitlab.siare.gov.py/siare-develop/gestion-entornos.git
```
> Proyecto que contiene las configuraciones de los servicios para los diferentes entornos.
## Pasos de Instalación
### Copia local del repositorio
Ejecutar el siguiente comando para clonar el repositorio
``` shell=
$ git clone https://gitlab.siare.gov.py/siare-develop/gestion-entornos.git
```
El contenido del docker-compose.yml se describe a continuación:
``` shell=
version: '2.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.1.1
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://192.168.169.104:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: 192.168.169.104
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zookeeper
```
Acceder al directorio del repositorio `./gestion-entornos/kafka/single-node` y ejecutar el siguiente comando:
``` shell=
$ docker-compose up -d
```
> El comando inicia los contenedores y lo ejecuta en segundo plano
Para verificar la correcta ejecución de los contenedores, se debe ejecutar el siguiente comando:
``` shell=
$ docker-compose ps
```
### Asignación de volumen externo
La asignación de volumen externo permite personalizar la ubicación de los datos generados por el kafka.
#### Creación de volumen
Se debe de identificar la ubicación de referencia para el volumen ejemplo:
`/opt/kafka/data`
Se debe de actualizar la configuración inicial del docker compose.
Es necesario agregar la siguiente configuración:
```
volumes:
kafka-data:
driver: local
driver_opts:
type: none
device: /opt/kafka/data
o: bind
```
y asignar el volumen creado al contenedor a ser creado por el compose:
```
volumes:
- kafka-data:/var/lib/kafka/data
```
Con estos dos cambios de configuración ya es posible personalizar la ubicación de los datos generados por el kafka.
Archivo final con las modificaciones:
```
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.1.1
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "9999:9999"
volumes:
- kafka-data:/var/lib/kafka/data
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://192.168.169.111:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: 192.168.169.111
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zookeeper
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
KAFKA_CLUSTERS_0_NAME: "lab_siare"
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:9092"
depends_on:
- kafka1
volumes:
kafka-data:
driver: local
driver_opts:
type: none
device: /opt/kafka/data
o: bind
```
# Uso de Kafka con Spring
El proyecto Spring para Apache Kafka (spring-kafka) aplica los conceptos básicos de Spring al desarrollo de soluciones de mensajería basadas en Kafka. Proporciona una "plantilla" como abstracción de alto nivel para el envío de mensajes.
## Dependencia Maven
```xml=
<dependency>
<groupId>org.springframework.kafka</groupId
<artifactId>spring-kafka</artifactId>
</dependency>
```
## Configuración Java
En necesario la creación de una clase para la configuración del Kafka.
### KafkaConfig
Crear la clase KafkaConfig dentro del proyecto java con el siguiente contenido:
```java=
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import lombok.extern.slf4j.Slf4j;
@Configuration
@Slf4j
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(configs());
}
@Bean
public Map<String, Object> configs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.169.104:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
log.info("realizando configuracion de propiedades kafka");
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
```
> La clase tiene la responsabilidad de crear los objetos necesarios para la conexión con el servidor Kafka.
* @Configuration : Anotación que indica al spring que la clase será utilizada como fuente de definición de Bean para el contexto de la aplicación
* @Slf4j : Anotación de la librería *lombok*, que permite crear logs
* EnableKafka : Anotación que permite la detención de los beans anotados con @KafkaListener y que son gestionados por el Spring
* @Bean : Anotación que nos permite generar Objetos en forma personalizada dentro del contexto de la aplicación.
### Uso del Kafka
Para la utilización del Kafka se ha generado los siguientes archivos :
### SiareKafka
Anotación que permite identificar donde será requerido el acceso al servidor de Kafka
```java=
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SiareKafka {
}
```
* @Target : Indica donde será aplicado la anotación, en este caso será utilizado a nivel de método unicamente (ElementType.Method).
* @Retention : Indica si la anotación estará disponible para el JVM en tiempo de ejecución.
### SiareKafkaAspect
Clase de Aspecto que intercepta las invocaciones de métodos de servicio, todas las clases de Aspecto debe llevar la anotación **@Aspect**
```java=
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
@Aspect
@Component
@AllArgsConstructor
public class SiareKafkaAspect {
// Definición de atributos para la gestión de objetos del server de kafka
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper objectMapper;
@Around("@annotation(SiareKafka)")
public Object cache(ProceedingJoinPoint pjp) throws Throwable {
//Invocación del método original para obtener el objeto solicitado
Object objetoStream = pjp.proceed();
String nuevoObjeto = objectMapper.writeValueAsString(objetoStream);
kafkaTemplate.send(pjp.getTarget().getClass().getSimpleName(), nuevoObjeto);
return objetoStream;
}
}
```
* @Component : Anotación que indica al spring que la clase será un Bean gestionado dentro del contexto de la aplicación
* @AllArgsConstructor : Anotación de la librería *lombok*, que permite generar un constructor con todos los atributos de la clase.
### Ejemplo de utilización
En las implementaciones de los servicios agregamos la anotacion **@SiareKafka**
```java=
@SiareKafka
@Override
public ResponseDto<List<MetodosDto>> obtenerRegistrosPaginadoOrdenado(
String filtros,
int page,
int cantidad,
String orderBy,
String orderDir) {
return super.obtenerRegistrosPaginadoOrdenado(
filtros,
page,
cantidad,
orderBy,
orderDir);
}
```
### Configuraciones Utilizando Siare-Common
Para mejorar el desarrollo se ha modificado la libreria siare-common, la cual incorpora la configuracion de kafka y las variables necesarias para configurar de manera simple.
Las variables que se deben configurar en el archivo *application.properties* son las siguientes:
- _siare.kafka.enabled_, esta variable determina si es que en el microservicio se da o no soporte a kafka, por omision esta se encuentra en falso, para poder habilitar la incorporacion se debe agregar en verdadero (true).
- _kafka.brokers_, esta variable determina la ruta de los brokers de kafka, por ejemplo, kafka-broker1:9200
- _kafka.user_, nombre de usuario asociado a kafka, si no se utiliza usuario y password, se deben agregar de igual manera en vacio.
- _kafka.password_, clave asociada al usuario declarado anteriormente.
- _kafka.groupid_, grupo con el cual se registrara el microservicio.
- _kafka.consumer.concurrency_, cantidad de consumers que se se generaran, se recomienda analizar de acuerdo al tipo de microservicio.
- _kafka.siare.consumer.max-poll-records_, esta variable permite determinarla cantidad de registros maximos por cada consumer, su configuracion es opcional. Por default se encuentra en 500 mensajes por consumer.
- _kafka.siare.consumer.fetch-max-wait-ms_, cantidad maxima en milisegundos que se espera la ejecucion de un pool, por definicion son 30 segundos, si las tareas son de alta carga y requieren de mas tiempo se sugiere modificar el tiempo.
- _kafka.siare.consumer.heartbeat-interval-ms_, tiempo entre intervalos de latido del consumer contra el brokers, por edicion son 3 segundos, se debe modificar si la latencia de conexion del microservicio es alta.
- _kafka.siare.consumer.session-timeout-ms_, timeout del consumer en caso de problemas.
Se debe tener en cuenta que si un consumer se desconecta del broker, este automaticamente reintenta la conexion a menos que el broker haga un kick-out, en donde el consumer no se reconectara y se debera reinciar el microservicio.
Ejemplo de configuracion:
```bash=
siare.kafka.enabled = true
kafka.brokers = kafka-broker-1:9200;kafka-broker-2:9200;kafka-broker-3:9200
kafka.user =
kafka.password =
kafka.groupid = foo_microservice
```
# Comandos de streams
Para poder ver el listado de streams que se encuentran actualmente en kafka
```bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```
A continuacion se detalla como se deben configurar los streams, indicando la retencion asociada a cada uno de los mismos.
```bash
bin/kafka-topics.sh --zookeeper "kafka-node-1:2181" --alter --topic auditoria --config retention.ms=86400000
```
Este comando se utilizada para indicar que se esta haciendo una modificacion sobre el topico de auditoria y que la retencion de los mensajes sera de 2,4 horas (o 144 minutos).
# Streams y configuracion
A continuacion se muestran los streams y la configuracion sugerida a los mismos, cabe destacar que los stream/topicos se iran agregando al sistema por lo que la documentacion de los mismos y configuracion debe ser actualizada en este documento.
|Topico|Retencion|
|------|---------|
|auditoria|86.400.000 (1 dia)|
|indexar-almacenar|86.400.000 (1 dia)|
|restablece-contrasena| 43.200.000 (12 horas)|
# Referencias
[1] Wu, H., Shang, Z., & Wolter, K. (2019, August). Performance prediction for the apache kafka messaging system. In 2019 IEEE 21st International Conference on High Performance Computing and Communications; IEEE 17th International Conference on Smart City; IEEE 5th International Conference on Data Science and Systems (HPCC/SmartCity/DSS) (pp. 154-161). IEEE.
[2] Díaz, M., Martín, C., & Rubio, B. (2016). State-of-the-art, challenges, and open issues in the integration of Internet of things and cloud computing. Journal of Network and Computer applications, 67, 99-117. Doi: https://doi.org/10.1016/j.jnca.2016.01.010
[3] Integración. ¿Qué es Apache Kafka?. Recuperado 20 de agosto de 2021, de https://www.redhat.com/es/topics/integration/what-is-apache-kafka