# 3. Kafka Producers: Writing Messages to Kafka ![螢幕擷取畫面 2023-11-16 144028](https://hackmd.io/_uploads/B1PgqEQVp.png) 一、創建 producer A Kafka producer has three mandatory properties: - bootstrap.servers 1. broker 清單,格式為 host:port 2. 不須包含所有 broker,producer 會從給定的 broker 找到其他 broker 訊息 3. 建議至少兩個,避免單點故障 - key.serializer 1. brokers expect byte arrays as keys and values of messages. 2. a name of a class that implements the "org.apache.kafka.common.serialization.Serializer" interface 3. Kafka client package 包含了 ByteArraySerializer、StringSerializer、IntegerSerializer 等常用的 serializer - value.serializer 1. 同 key.serializer 範例程式: ``` private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps); ``` 二、發送訊息的三種方式 1. 射後不理(Fire-and-forget):如名,可能丟失訊息 2. 同步發送(Synchronous send):使用 send() method 後會返回一個 Future object,可以用 get() 來確認是否成功 3. 異步發送(Asynchronous send):使用 send() method 時指定一個 callback function,收到 Kafka broker 回覆後執行 ``` # 射後不理 ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } # 同步 ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); } # 異步 private class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback()); ``` SerializationException:序列化失敗 BufferExhaustedException:緩衝區已滿 TimeoutException:timeout InterruptException:發送 process 被中斷 三、producer 配置 - 參數 1. acks:必須收到多少 partition replicas 回應,producer 才會認定寫入成功,acks = 0 代表完全步等待回應,acks = 1 代表只會等待 leader 回應,acks = all 代表所有 replica 2. max.block.ms 3. delivery.timeout.ms 4. request.timeout.ms 5. retries and retry.backoff.ms 6. linger.ms 7. buffer.memory:調整 buffer 大小 8. compression.type:默認傳送訊息時不進行壓縮,考量 CPU vs 網路速度 vs 儲存 9. retries:retry 之間預設等待 100ms (可用 retry.backoff.ms 調整,可以先測試一下一個節點崩潰之後恢復需要多少時間) 10. batch.size 11. client.id:任意文字,識別來源 12. max.in.flight.requests.per.connection 13. max.request.size 14. receive.buffer.bytes and send.buffer.bytes 15. enable.idempotence - 2.1 之後,發送訊息花費的時間被分為兩段 1. send() 回傳之後的時間,在此之前 send 的執行序將被阻擋 2. send() 成功回傳,到 callback 被trigger 3. 如果是使用同步發送,則無法分拆兩段時間 4. ![螢幕擷取畫面 2023-11-16 144335](https://hackmd.io/_uploads/H1US9474p.png) 四、Serializers - 當傳送的不是簡單的字串或整數,可以選擇通用的 serialization library 像是 Avro、Thrift、Protobuf to create records 或是自建 - 建議使用通用的序列化器 ``` public class Customer { private int customerID; private String customerName; public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; } public int getID() { return customerID; } public String getName() { return customerName; } } import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerSerializer implements Serializer<Customer> { @Override public void configure(Map configs, boolean isKey) { // nothing to configure } @Override /** We are serializing Customer as: 4 byte int representing customerId 4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null) N bytes representing customerName in UTF-8 **/ public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) return null; else { if (data.getName() != null) { serializedName = data.getName().getBytes("UTF-8"); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException( "Error when serializing Customer to byte[] " + e ); } } @Override public void close() { // nothing to close } } ``` - 維護新舊消息之間的相容性很困難,新舊版本的序列化器、反序列化器間的除錯很困難,且若有多個 producer,則都需要修改密碼 五、Partitions 1. Kafka 訊息為 key-value pairs,但可以只建立只含 topic 及 value 的訊息 2. key 有兩個作用,儲存的附加訊息、決定要送至哪個 partition 3. 相同 key 值的訊息會被送到同一個 partition,為空時則隨機傳送 4. key 在訊息壓縮中扮演重要腳色(CH6) 5. 分區器:Round-Robin Partitioner、UniformStickyPartitioner,使用 UniformStickyPartitioner 可以避免單一 partititon 工作量過大 ``` # produce keyed messages to enable hashed partitioning producer.send('my-topic', key=b'foo', value=b'bar') # encode objects via msgpack producer = KafkaProducer(value_serializer=msgpack.dumps) producer.send('msgpack-topic', {'key': 'value'}) # produce json messages producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii')) producer.send('json-topic', {'key': 'value'}) ```