--- tags: CS Master --- KIP-691: Enhance Transactional Producer Exception Handling === :link: [KIP-691](https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling) :bulb: I made a [slide](https://docs.google.com/presentation/d/14ZGC6YhRNYujCj6J6RnYlB3AOA0bjmvR-cKvegiqso8) for this KIP :construction: more to edit.... ## Background knowledge * [Exactly-once Semantics in Apache Kafka](https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/) * [Transactions in Apache Kafka](https://www.confluent.io/blog/transactions-apache-kafka/) * [EOS KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging) ### Exactly Once Semantics Exactly-once is a really hard problem ![](https://i.imgur.com/OGoajVa.png) Imaging a single-partition topic, a producer and a consumer: ![](https://i.imgur.com/PhMoyds.png) * How about a broker fail? * How about the producer=to-broker RPC fail? * How about client fail? So here comes idempotent operation: 不論做多少次皆視為一次且保持原先順序 Producer send is idempotent (`enable.idempotence=true`) 其原理與 TCP 相似 ### Transactions - Atomic writes across multiple partitions * A batch of messages are eventually visible to consumer or non are visible.* * Also send consumer offset with the data you have processed, allowing end-to-end exactly-once semantics. * Building on idempotency and atomicity, exactly-once stream processing: `processing.guarantee=exactly_once` A bad but basic usage of transactional producer: ```java producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { producer.close(); } catch(KafkaException e) { producer.abortTransaction(); } ``` (Actually every function call can throw exception because it's ***transaction***. You **Need** to handle the exception) Kafka Producer transactional semantics since 0.11: * `InitTransaction()` for transactional producer identity initialization * This is a **PRETTY** weird function and the community is trying to get rid of it. Why you need to init before using? It doesn't make sense. * `beginTransaction()` to start a new transaction * `sendOffsetsToTransaction()` to commit consumer offsets advanced within the current transaction * `commitTransaction()` commit the ongoing transaction * `abortTransaction()` abort the ongoing transaction ## Proposed changes A new transactional API usage template which makes EOS processing safer from handling a mix of fatal and non-fatal exceptions: Separate the transactional processing into two phases: the **data transmission phase**, and the **commit phase** * data transmission ```java final boolean shouldCommit; try { producer.beginTransaction(); // Do some processing and build the records we want to produce. List<ProducerRecord> processed = process(consumed); for (ProducerRecord record : processed) producer.send(record, (metadata, exception) -> { // not required to capture the exception here. }); producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata()); shouldCommit = true; } catch (Exception e) { // Catch any exception thrown from the data transmission phase. shouldCommit = false; } ``` All good: Commit in the next stage Transaction failure: Should not commit. Abort the ongoing transaction. * commit phase ```java try { if (shouldCommit) { producer.commitTransaction(); } else { resetToLastCommittedPositions(consumer); producer.abortTransaction(); } } catch (CommitFailedException e) { // Transaction commit failed with abortable error, user could reset // the application state and resume with a new transaction. The root // cause was wrapped in the thrown exception. resetToLastCommittedPositions(consumer); producer.abortTransaction(); } ``` `CommitFailedException` wraps all non-fatal exceptions. Others are all fatal. Just do controlled shutdown. Before changes: `commitTransaction()` throws non-fatal exceptions in their raw format. ### Callback Exception Improvement **In producer.send():** In EOS setup, it is not required to handle the exception.(because For non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. => ++user needs to maintain a list for checking exceptions types.++ **New exception type:** `ProduceFailedException` ``` enum FailureType { MESSSAGE_REJECTED, DELIVERY_FAILED, TRANSACTION_FAILED } ``` ## Summary: Public changes As mentioned in the proposed changes section, we would be doing the following public API changes: * The commitTransaction() API will throw CommitFailedException to wrap non-fatal exceptions * All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes: * beginTransaction * sendOffsetsToTransactionsend * send We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.