# 3. Kafka Producers: Writing Messages to Kafka

一、創建 producer
A Kafka producer has three mandatory properties:
- bootstrap.servers
1. broker 清單,格式為 host:port
2. 不須包含所有 broker,producer 會從給定的 broker 找到其他 broker 訊息
3. 建議至少兩個,避免單點故障
- key.serializer
1. brokers expect byte arrays as keys and values of messages.
2. a name of a class that implements the "org.apache.kafka.common.serialization.Serializer" interface
3. Kafka client package 包含了 ByteArraySerializer、StringSerializer、IntegerSerializer 等常用的 serializer
- value.serializer
1. 同 key.serializer
範例程式:
```
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
```
二、發送訊息的三種方式
1. 射後不理(Fire-and-forget):如名,可能丟失訊息
2. 同步發送(Synchronous send):使用 send() method 後會返回一個 Future object,可以用 get() 來確認是否成功
3. 異步發送(Asynchronous send):使用 send() method 時指定一個 callback function,收到 Kafka broker 回覆後執行
```
# 射後不理
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
# 同步
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
# 異步
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
```
SerializationException:序列化失敗
BufferExhaustedException:緩衝區已滿
TimeoutException:timeout
InterruptException:發送 process 被中斷
三、producer 配置
- 參數
1. acks:必須收到多少 partition replicas 回應,producer 才會認定寫入成功,acks = 0 代表完全步等待回應,acks = 1 代表只會等待 leader 回應,acks = all 代表所有 replica
2. max.block.ms
3. delivery.timeout.ms
4. request.timeout.ms
5. retries and retry.backoff.ms
6. linger.ms
7. buffer.memory:調整 buffer 大小
8. compression.type:默認傳送訊息時不進行壓縮,考量 CPU vs 網路速度 vs 儲存
9. retries:retry 之間預設等待 100ms (可用 retry.backoff.ms 調整,可以先測試一下一個節點崩潰之後恢復需要多少時間)
10. batch.size
11. client.id:任意文字,識別來源
12. max.in.flight.requests.per.connection
13. max.request.size
14. receive.buffer.bytes and send.buffer.bytes
15. enable.idempotence
- 2.1 之後,發送訊息花費的時間被分為兩段
1. send() 回傳之後的時間,在此之前 send 的執行序將被阻擋
2. send() 成功回傳,到 callback 被trigger
3. 如果是使用同步發送,則無法分拆兩段時間
4. 
四、Serializers
- 當傳送的不是簡單的字串或整數,可以選擇通用的 serialization library 像是 Avro、Thrift、Protobuf to create records 或是自建
- 建議使用通用的序列化器
```
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if
name is Null)
N bytes representing customerName in UTF-8
**/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException(
"Error when serializing Customer to byte[] " + e
);
}
}
@Override
public void close() {
// nothing to close
}
}
```
- 維護新舊消息之間的相容性很困難,新舊版本的序列化器、反序列化器間的除錯很困難,且若有多個 producer,則都需要修改密碼
五、Partitions
1. Kafka 訊息為 key-value pairs,但可以只建立只含 topic 及 value 的訊息
2. key 有兩個作用,儲存的附加訊息、決定要送至哪個 partition
3. 相同 key 值的訊息會被送到同一個 partition,為空時則隨機傳送
4. key 在訊息壓縮中扮演重要腳色(CH6)
5. 分區器:Round-Robin Partitioner、UniformStickyPartitioner,使用 UniformStickyPartitioner 可以避免單一 partititon 工作量過大
```
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
```