---
tags: CS Master
---
KIP-691: Enhance Transactional Producer Exception Handling
===
:link: [KIP-691](https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling)
:bulb: I made a [slide](https://docs.google.com/presentation/d/14ZGC6YhRNYujCj6J6RnYlB3AOA0bjmvR-cKvegiqso8) for this KIP
:construction: more to edit....
## Background knowledge
* [Exactly-once Semantics in Apache Kafka](https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/)
* [Transactions in Apache Kafka](https://www.confluent.io/blog/transactions-apache-kafka/)
* [EOS KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging)
### Exactly Once Semantics
Exactly-once is a really hard problem

Imaging a single-partition topic, a producer and a consumer:

* How about a broker fail?
* How about the producer=to-broker RPC fail?
* How about client fail?
So here comes idempotent operation: 不論做多少次皆視為一次且保持原先順序
Producer send is idempotent (`enable.idempotence=true`) 其原理與 TCP 相似
### Transactions - Atomic writes across multiple partitions
* A batch of messages are eventually visible to consumer or non are visible.*
* Also send consumer offset with the data you have processed, allowing end-to-end exactly-once semantics.
* Building on idempotency and atomicity, exactly-once stream processing: `processing.guarantee=exactly_once`
A bad but basic usage of transactional producer:
```java
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
```
(Actually every function call can throw exception because it's ***transaction***. You **Need** to handle the exception)
Kafka Producer transactional semantics since 0.11:
* `InitTransaction()` for transactional producer identity initialization
* This is a **PRETTY** weird function and the community is trying to get rid of it. Why you need to init before using? It doesn't make sense.
* `beginTransaction()` to start a new transaction
* `sendOffsetsToTransaction()` to commit consumer offsets advanced within the current transaction
* `commitTransaction()` commit the ongoing transaction
* `abortTransaction()` abort the ongoing transaction
## Proposed changes
A new transactional API usage template which makes EOS processing safer from handling a mix of fatal and non-fatal exceptions:
Separate the transactional processing into two phases: the **data transmission phase**, and the **commit phase**
* data transmission
```java
final boolean shouldCommit;
try {
producer.beginTransaction();
// Do some processing and build the records we want to produce.
List<ProducerRecord> processed = process(consumed);
for (ProducerRecord record : processed)
producer.send(record, (metadata, exception) -> {
// not required to capture the exception here.
});
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
shouldCommit = true;
} catch (Exception e) {
// Catch any exception thrown from the data transmission phase.
shouldCommit = false;
}
```
All good: Commit in the next stage
Transaction failure: Should not commit. Abort the ongoing transaction.
* commit phase
```java
try {
if (shouldCommit) {
producer.commitTransaction();
} else {
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
}
} catch (CommitFailedException e) {
// Transaction commit failed with abortable error, user could reset
// the application state and resume with a new transaction. The root
// cause was wrapped in the thrown exception.
resetToLastCommittedPositions(consumer);
producer.abortTransaction();
}
```
`CommitFailedException` wraps all non-fatal exceptions. Others are all fatal. Just do controlled shutdown.
Before changes: `commitTransaction()` throws non-fatal exceptions in their raw format.
### Callback Exception Improvement
**In producer.send():**
In EOS setup, it is not required to handle the exception.(because
For non-EOS cases, the current exception type mechanism is complicated as it throws raw exceptions. => ++user needs to maintain a list for checking exceptions types.++
**New exception type:** `ProduceFailedException`
```
enum FailureType {
MESSSAGE_REJECTED,
DELIVERY_FAILED,
TRANSACTION_FAILED
}
```
## Summary: Public changes
As mentioned in the proposed changes section, we would be doing the following public API changes:
* The commitTransaction() API will throw CommitFailedException to wrap non-fatal exceptions
* All the non-fatal exceptions thrown from data transmission APIs will be wrapped as KafkaException, which we will be documented clearly. This includes:
* beginTransaction
* sendOffsetsToTransactionsend
* send
We would also let commitTransaction API only throw CommitFailedException with wrapped cause when hitting non-fatal exceptions, to simply the exception try-catching.