# Analisis de un comando laravel que consume un stream de debezium. ## Debezium Transmita los cambios de su base de datos. Debezium es una plataforma distribuida de código abierto para la captura de datos de cambios. Inícielo, apúntelo a sus bases de datos y sus aplicaciones pueden comenzar a responder a todas las inserciones, actualizaciones y eliminaciones que otras aplicaciones envían a sus bases de datos. Debezium es duradero y rápido, por lo que sus aplicaciones pueden responder rápidamente y nunca perderse un evento, incluso cuando las cosas van mal. Ddebezium es rápido y eso significa que sus aplicaciones y servicios pueden reaccionar rápidamente. Debezium está construido sobre Apache Kafka, que está probado, es escalable y maneja grandes volúmenes de datos muy rápidamente. ## Ejecutando el comando php artisan kafka:listen --topic=topic_name ## La función Handle Este comando ejecuta el siguiente codigo, que como vemos instancia KafkaService y luego ejecuta ejecuta `KafkaService::consumeTopics` ```php public function handle() { $this->info('Init kafka consumer work'); $topic = $this->option('topic'); $service = new KafkaService($topic); $service->consumeTopics(); } ``` ## Instanciando un nuevo KafkaService Si vamos `KafkaService::construct` vemos que tenemos unas inicializaciones y seteo de algunas rutas en base a la configuración de nuestro `.env` ```php public function __construct($topic = null) { $this->kafkaHandlerService = app()->make(KafkaHandlerService::class); $this->kafkaServer = env('KAFKA_HOST', '127.0.0.1') . ':' . env('KAFKA_PORT', '9092'); $this->kafkaHost = env('KAFKA_HOST', '127.0.0.1') . ':' . env('KAFKA_DZ_PORT', '8081'); $this->binaryPath = env('KAFKA_CONSUMER_BINARY_PATH', ''); $this->binary = env('KAFKA_CONSUMER_BINARY', 'kafka-console-consumdasder'); $this->topic = $topic; } ``` ## LLamando a `KafkaService::consumeTopics` si vamos a `KafkaService::consumeTopics` vemos que primero se valida la presencia de un topic. <i>En kafka, un '<b>topic</b>' es un canal por el cual va fluir información.</i> Luego llama directamente a `KafkaSetrvice::handleConsumer` ```php public function consumeTopics() { if(empty($this->topic)) throw new \Exception('please specify a topic to consumer with --topic options'); $this->handleConsumer(); } ``` ## Llamando a `KafkaSetrvice::handleConsumer` Si vamos a `KafkaService::handleConsumer`, vemos que se ejecuta un comando de kafka. Esto se hace a través de la interfaz `Smfony\Components\Process\Process`. El comando que finalmente se ejecuta es algo como kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --property schema.registry.url=http://127.0.0.1:8081 --topic topic_name Y la salida de este comando es capturado desde la salida estandar en la variable `$message` y redirigido a `kafkaHandlerService::handleMessage` ```php protected function handleConsumer() { $command[] = $this->binaryPath . $this->binary; $command[] = '--bootstrap-server'; $command[] = $this->kafkaServer; $command[] = '--topic'; $command[] = $this->topic; $command[] = '--from-beginning'; $command[] = ' --property schema.registry.url=http://' . $this->kafkaHost; $process = new Process($command); $process->setTimeout(0); $process->start(); foreach ($process as $type => $message) { if ($process::OUT === $type) { $this->kafkaHandlerService->handleMessage($message, $this->topic); } else { // $process::ERR === $type echo "\nRead from stderr: " . $message; } } } ``` ## Aclaración Intermedia Antes de continuar con el trace , hagamos un pequeño análisis y veamos la función que manejará la respuesta después que la llamada kafka sea ejecutada. ### kafkaHandlerService::construct Lo primero es obtener la configuración de kafka que está dentro dentro de `config/kafka.php`. Esto se realiza mediante el helper `config()->get('kafka')` que nos proporciona laravel. Luego intentamos obtener una instancia de la handler_class que previamente tenemos que definir. Esto se hace comodamente dsde el resolvedor de contenedores de laravel `app()->make()` Si la clase no existe o no se logra resolver el laravel lanzará una excepcion. ```php public function __construct() { $this->config = config()->get('kafka'); try { $this->instanceClass = app()->make($this->config['handler_class']); } catch (\Exception $e) { throw new \Exception('You must configure class to receive messages from kafka before listen'); } } ``` ### kafkaHandlerService::handleMessage Luego que la respuesta es recibida se llama a cada uno de los métodos definidos CREATE, UPDATE y DELETE. Estos son interfaceados hacia la clase handler que definimos y se ejecutarán los métodos que mapeamos en la configuración de `config/kafka.php`. ```php public function handleMessage($message, $topic) { try { $dataMessage = $this->proccessAndGetData(json_decode($message, true), $topic); if (empty($dataMessage['before']) && !empty($dataMessage['after'])) // CREATE // $this->handleCreate($dataMessage['after'], $topic); if(!empty($dataMessage['before']) && !empty($dataMessage['after']))// UPDATE // $this->handleUpdate($dataMessage['before'],$dataMessage['after'] ,$topic); if(!empty($dataMessage['before']) && empty($dataMessage['after']))// DELETE $this->handleDelete($dataMessage['before'], $topic); } catch (\Exception $e) { echo $e->getMessage(); } } ``` Para manejar el CREATE, se mapea con `$this->config['event_methods']['create']` ```php protected function handleCreate($after, $topic) { $this->instanceClass->{$this->config['event_methods']['create']}($after, $topic); } ``` Para manejar el UPDATE, se mapea con `$this->config['event_methods']['update']` ```php protected function handleUpdate($before, $after, $topic) { $this->instanceClass->{$this->config['event_methods']['update']}($before, $after, $topic); } ``` Para manejar el DELETE, se mapea con `$this->config['event_methods']['delete']` ```php protected function handleDelete($before, $topic) { $this->instanceClass->{$this->config['event_methods']['delete']}($before, $topic); } ``` ## una consideración adicional Observemos como se recuperan los datos desde la funcion `KafkaService::handleMessage()` Observamos una construcción `if/else` dependiente de la configuración global de nuestro `.env` que bifurcará el código dependiendo de si la constante KAFKA_CONSUMER_BINARY está seteado con el valor "`kafka-avro-console-consumer`" ```php private function proccessAndGetData($data, $topic) { $returnData = []; if (env('KAFKA_CONSUMER_BINARY') == 'kafka-avro-console-consumer') { $returnData['before'] = isset($data['before']) ? $data['before'][$topic . '.Value'] : null; $returnData['after'] = isset($data['after']) ? $data['after'][$topic . '.Value'] : null; }else{ $returnData['before'] = isset($data['payload']['before']) ? $data['payload']['before'] : null; $returnData['after'] = isset($data['payload']['after']) ? $data['payload']['after'] : null; } return $returnData; } ``` Podemos observar que Si se ha decidido utilizar el formato de intercambio Avro, entonces la respuesta vendrá en la estructura: ```php $data['before'][topic.Value] $data['after'][topic.Value] ``` si por el contrario se ha usado el intercambio por defecto, vendrá en ```php $data['payload']['before'] $data['payload']['after'] ``` ### pero, que es avro? Avro es un marco de serialización de datos y llamadas de procedimiento remoto orientado a filas desarrollado dentro del proyecto Hadoop de Apache. Utiliza JSON para definir tipos de datos y protocolos, y serializa datos en un formato binario compacto. En el mundo de Kafka, Apache Avro es, con mucho, el protocolo de serialización más utilizado. Avro es un sistema de serialización de datos. Combinado con Kafka, proporciona serialización binaria rápida, robusta y basada en esquemas. ## LLamando al comando kafka Siguiendo con el trace , vemos que el comando `kafka-console-consumer/kafka-avro-console-consumer` es un shellscript de kafka el cual es llamado con el parámeotr `kafka.consumer.ConsoleConsumer` Esta llama a otro script el cual que ejecutará clases java `kafka-run-class.sh` y le pasa como argumento el nombre de la clase que quiere ejcutar, asi como todos los argumentos pasados al script `kafka-console-consumer/kafka-avro-console-consumer` Basicamente ambos comandos son una mascara para llamar a `kafka-run-class.sh kafka.consumer.ConsoleConsumer` ### `kafka-avro-console-consumer` v/s `kafka-console-consumer` Como podemos observar, `kafka-avro-console-consumer` es diferente de `kafka-console-consumer`, pero su diferencia radica principalmente en el formato de intercambio que se usará para recibir el stream de datos. ### ¿Qué es el consumidor de la consola Kafka Avro? `kafka-avro-console-consumer` es simplemente un `kafka-console-consumer` con un formateador avro (io.confluent.kafka.formatter.AvroMessageFormatter). Esta consola utiliza el convertidor Avro con el registro de esquema para leer correctamente el esquema de datos de Avro . Finalmente concluiremos que avro es una forma de validar el intercambio de datos con un sistema de tipos estático análogo a lo que puede ser jsonSchema o graphQL. ### kafka-console-consumer ```bash #!/bin/bash base_dir=$(dirname $0) export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" $base_dir/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@ ``` ### kafka-avro-console-consumer ```bash base_dir=$(dirname $0)/.. # Production jars export CLASSPATH=$CLASSPATH:$base_dir/share/java/kafka-serde-tools/* # Development jars. `mvn package` should collect all the required dependency jars here for dir in $base_dir/package-kafka-serde-tools/target/kafka-serde-tools-package-*-development; do export CLASSPATH=$CLASSPATH:$dir/share/java/kafka-serde-tools/* done DEFAULT_AVRO_FORMATTER="--formatter io.confluent.kafka.formatter.AvroMessageFormatter" DEFAULT_SCHEMA_REGISTRY_URL="--property schema.registry.url=http://localhost:8081" for OPTION in "$@" do case $OPTION in --formatter) DEFAULT_AVRO_FORMATTER="" ;; --*) ;; *) PROPERTY=$OPTION case $PROPERTY in schema.registry.url*) DEFAULT_SCHEMA_REGISTRY_URL="" ;; esac ;; esac done exec $(dirname $0)/schema-registry-run-class kafka.tools.ConsoleConsumer $DEFAULT_AVRO_FORMATTER $DEFAULT_SCHEMA_REGISTRY_URL "$@" ``` ## El comando `kafka-run-class.sh` Luego si nos vamos a kafka-run-class.sh, observamos varias validaciones de variables y rutas ```bash #!/bin/bash if [ $# -lt 1 ]; then echo "USAGE: $0 classname [opts]" exit 1 fi base_dir=$(dirname $0)/.. for file in $base_dir/project/boot/scala-2.8.0/lib/*.jar; do CLASSPATH=$CLASSPATH:$file done for file in $base_dir/core/target/scala_2.8.0/*.jar; do CLASSPATH=$CLASSPATH:$file done for file in $base_dir/core/lib/*.jar; do CLASSPATH=$CLASSPATH:$file done for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar; do if [ ${file##*/} != "sbt-launch.jar" ]; then CLASSPATH=$CLASSPATH:$file fi done if [ -z "$KAFKA_JMX_OPTS" ]; then KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " fi if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties" fi if [ $JMX_PORT ]; then KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT " fi if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi $JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ ``` Esto se sintetiza en que se ejecutará el siguiente comando: /bin/java -Xmx512M -server \ -Dcom.sun.management.jmxremote \ -Dlog4j.configuration=file:/path/to/custom/kafka-console-consumer-log4j.properties \ -cp [classpath] kafka.consumer.ConsoleConsumer \ --bootstrap-server 127.0.0.1:9092 \ --property schema.registry.url=http://127.0.0.1:8081 \ --topic topic_name Y esta es una llamada directa a una clase que ya es parte del motor del consumidor de kafka ## EL consumidor de kafka Finalmente hemos llegado a la clase de `scala` que hace el trabajo de consumir los datos. ```scala package kafka.consumer import scala.collection.mutable._ import scala.collection.JavaConversions._ import org.I0Itec.zkclient._ import joptsimple._ import org.apache.log4j.Logger import java.util.Arrays.asList import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ import kafka.utils.Utils import kafka.utils.ZkUtils import kafka.utils.StringSerializer /** * Consumer that dumps messages out to standard out. * */ object ConsoleConsumer { private val logger = Logger.getLogger(getClass()) def main(args: Array[String]) { val parser = new OptionParser val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.").withRequiredArg.describedAs("topic").ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.").withRequiredArg.describedAs("urls").ofType(classOf[String]) val groupIdOpt = parser.accepts("group", "The group id to consume on.").withRequiredArg.describedAs("gid").defaultsTo("console-consumer-" + new Random().nextInt(100000)).ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg.describedAs("size").ofType(classOf[java.lang.Integer]).defaultsTo(1024 * 1024) val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg.describedAs("size").ofType(classOf[java.lang.Integer]).defaultsTo(2 * 1024 * 1024) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.").withRequiredArg.describedAs("class").ofType(classOf[String]).defaultsTo(classOf[NewlineMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property").withRequiredArg.describedAs("prop").ofType(classOf[String]) val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +"start with the earliest message present in the log rather than the latest message.") val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms").withRequiredArg.describedAs("ms").ofType(classOf[java.lang.Integer]).defaultsTo(10*1000) val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.").withRequiredArg.describedAs("num_messages").ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +"skip it instead of halt.") val options: OptionSet = tryParse(parser, args) checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt) val props = new Properties() props.put("groupid", options.valueOf(groupIdOpt)) props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString) props.put("fetch.size", options.valueOf(fetchSizeOpt).toString) props.put("auto.commit", "true") props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") props.put("zk.connect", options.valueOf(zkConnectOpt)) val config = new ConsumerConfig(props) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val topic = options.valueOf(topicIdOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 val connector = Consumer.create(config) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack if(!options.has(groupIdOpt)) tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) } }) var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0) val iter = if(maxMessages >= 0) stream.slice(0, maxMessages) else stream val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) try { for(message <- iter) { try { formatter.writeTo(message, System.out) } catch { case e => if (skipMessageOnError) logger.error("error processing message, skipping and resume consumption: " + e) else throw e } } } catch { case e => logger.error("error processing message, stop consuming: " + e) } System.out.flush() formatter.close() connector.shutdown() } def tryParse(parser: OptionParser, args: Array[String]) = { try { parser.parse(args : _*) } catch { case e: OptionException => { Utils.croak(e.getMessage) null } } } def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { logger.error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } } } def tryParseFormatterArgs(args: Iterable[String]): Properties = { val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) if(!splits.forall(_.length == 2)) { System.err.println("Invalid parser arguments: " + args.mkString(" ")) System.exit(1) } val props = new Properties for(a <- splits) props.put(a(0), a(1)) props } trait MessageFormatter { def writeTo(message: Message, output: PrintStream) def init(props: Properties) {} def close() {} } class NewlineMessageFormatter extends MessageFormatter { def writeTo(message: Message, output: PrintStream) { val payload = message.payload output.write(payload.array, payload.arrayOffset, payload.limit) output.write('\n') } } def tryCleanupZookeeper(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId logger.info("Cleaning up temporary zookeeper data under " + dir + ".") val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer) zk.deleteRecursive(dir) zk.close() } catch { case _ => // swallow } } } ```