Bài viết nằm trong series Apache Kafka từ zero đến one.
Trước khi đến với các concept khác trong Kafka, cùng thực hành với Kafka CLI trước cho đỡ quên lý thuyết. Let's begin.
Đầu tiên cần set up môi trường với Kafka stand-alone (single broker). Vì practice với command line nên mình không sử dụng Docker, vã cmd luôn cho tiện.
Đảm bảo đã cài đặt sẵn JDK trên máy nhé, thấp nhất là version 8.
Sau đó download Apache Kafka tại đây, hiện tại latest version là 2.8.0.
Giải nén và bắt đầu cuộc hành trình. Mở terminal, kiểm tra thư mục hiện tại, nếu thấy output như này là ok:
Tiếp theo là add PATH để thực thi cho các câu lệnh cho nhanh:
Thêm đường dẫn tới Apache Kafka vào cuối file:
Testart terminal hoặc thực hiện command để reload PATH:
Verify để chắc chắn đã add PATH thành công với câu lệnh:
Mình sử dụng Linux, nếu bạn sử dụng Windows thì search Edit system environment để thêm PATH nhé, sau đó cũng kiểm tra tương tự:
Bước tiếp theo, tạo folder data cho Zookeeper tại bất kì chỗ nào bạn muốn, hoặc tạo trong folder Kafka:
Sau đó sửa config để Zookeeper write snapshot data ra folder vừa tạo:
Tìm dòng config dataDir=/tmp/zookeeper
và thay thế bằng đường dẫn đến thư mục:
Có thể không cần làm bước này
, sử dụng default data folder tại
/tmp/zookeeper
.
Start Zookeeper:
Zookeeper start mặc định với port 2181, kiểm tra bằng cmd:
Thấy output như trên tức là start thành công, hoặc có thể thực hiện telnet để kiểm tra. Lúc này khi mở folder data zookeeper sẽ thấy folder version-2.
Làm tương tự như đã làm với Zookeeper, tạo folder data:
Sửa file config:
Thay log.dirs=/tmp/kafka-logs
thành:
Start Kafka:
Kafka start với port 9092, có thể kiểm tra bằng cmd như với Zookeeper và check folder data.
Không quá phức tạp, tiếp theo cùng practice với Kafka CLI - Command line interface.
Tạo topic name bất kì với 4 required options:
- –bootstrap-server: địa chỉ của Kafka server.
- –topic: topic name.
- –partitions: số lượng partitions muốn tạo.
- –replication-factor: số lượng replication factor cho mỗi partition. Lưu ý rằng không thể tạo quá số lượng broker trong mạng cluster. Vì việc 1 broker giữ đến 2 bản sau của partition hoàn toàn không có ý nghĩa, do đó với ví dụ này, replication factor chỉ có thể = 1.
Để list toàn bộ topic hiện có, sử dụng –list option:
Sử dụng –describe options trong trường hợp muốn xem nhiều thông số hơn về topic:
Dòng đầu tiên có các thông tin chung về topic như topic name, số lượng partition, replication-factor và các additional config.
Các dòng tiếp theo diễn tả về thông số của từng partition. Nếu để ý sẽ thấy cả 3 thông số Leader, Replicas và Isr đều bằng 0. Vì nó là broker id, không phải số lượng. Với các hệ thống có nhiều hơn 1 broker, con số này có thể khác.
Nếu muốn xóa topic, sử dụng option –delete:
Tạo topic thành công, bước tiếp theo là tạo producer và gửi message với 1 required option:
- –bootstrap-server: list địa chỉ Kafka broker dưới dạng host1:port1,host2:port2.
Nếu thấy output như trên tức là đã tạo thành công producer, tiến hành gửi vài message:
Ctrl + C để terminate producer. Ngoài ra còn có rất nhiều options khác bạn có thể tham khảo document hoặc sử dụng command sau để xem các options:
Ok, topic my-first-topic đã được tạo trước đó, vậy nếu tạo producer gửi message đến topic chưa tồn tại thì chuyện gì sẽ xảy ra, thử xem sao:
Không có lỗi gì, gửi message xem sao:
Đã có thông báo warning:
Gửi tiếp message khác xem thế nào:
Không còn warning, cũng không có error. Ngon nghẻ rồi, vậy trong trường hợp producer produce message đến topic chưa tồn tại, Kafka sẽ tự động tạo topic. Dẫn đến 2 question:
- Message đầu tiên gửi đến có thành công không?
- Nếu Kafka tự tạo topic thì số lượng partition và replication-factor là bao nhiêu?
Sử dụng option –describe để kiểm tra:
Chỉ có duy nhất một partition, vì không tạo trước nên Kafka sử dụng default config để tạo topic. Chúng ta có thể sửa default config này trong file server.properties:
Sửa num.partitions=1
thành giá trị default mong muốn:
Sau đó restart Kafka để apply config mới. Best practice là hãy tạo topic trước khi có ý định produce/consume message, nếu không Kafka sẽ sử dụng default config.
Phần này sẽ practice Kafka console consumer để verify message đã gửi ở phần trước cũng với 1 required option là –bootstrap-server.
… chẳng thấy message nào. Nếu start consumer như trên, chúng chỉ có thể consume message từ ngay sau thời điểm được tạo mà không consume các message cũ.
Mở một terminal khác và produce message đến topic này:
Lúc này kiểm tra consumer đã thấy nhận được message.
Vì sao consumer không đọc toàn bộ message từ đầu?
Thử tưởng tượng topic có vài trăm nghìn hoặc hàng triệu message, chúng ta sẽ mất kha khá thời gian xử lý hết trước khi đến message hiện tại. Lý do thứ hai là vì design của Kafka hướng đến hệ thống stream, real-time processing, vì vậy default không đọc lại toàn bộ message của topic.
Tuy nhiên cũng có một vài trường hợp muốn đọc toàn bộ message của Kafka. Để làm điều đó ta thêm option –from-begining:
Như vậy consumer đã consume đủ 8 message, nhưng thứ tự có vẻ không chính xác lắm, chả lẽ có bug…
Chắc chắn là chẳng có bug nào ở đây cả, với [bài trước] ta đã biết message chỉ đảm bảo thứ tự trong cùng một partition. Topic my-first-topic có đến 3 partition thì việc toàn bộ message không order là chuyện hết sức bình thường.
Thử lại với topic có 1 partition, chúng ta sẽ thấy toàn bộ các message được order đúng thứ tự như lúc produce.
Tiếp theo cùng practice với consumer group. Mở 2 terminal và tạo các consumer cùng group:
Tạo producer và produce message tới topic my-first-topic:
Lúc này sẽ thấy các message lần lượt được route đến các consumer khác nhau trong cùng consumer group. Trong quá trình produce message hoàn toàn có thể tạo thêm hoặc xóa bớt consumer trong cùng group mà vẫn đảm bảo message được consume bình thường.
Việc start một consumer trong consumer group chỉ khác với consumer thường là nó nằm trong một group… Nếu muốn đọc toàn bộ message của topic, chỉ cần thêm option –from-beginning. Thử với một group my-second-app:
Lúc này consumer sẽ consume toàn bộ message của topic từ đầu đến cuối. Thử start một consumer khác thuộc cùng group xem sao:
Không có message nào mặc dù đã có option –from-beginning, vấn đề là gì?
Bài trước mình đã đề cập đến, sau khi consume message, offset được commit về Kafka. Consumer trước đã đọc toàn bộ message, các consumer khác thuộc cùng group không thể consume các message cũ được.
Nếu coi 1 group là 1 application (Java, Golang, Nodejs… whatever), Kafka đã cung cấp cơ chế để consume lại toàn bộ message topic cho consumer group khi start và thực tế ít khi sử dụng. Do đó chẳng có lý do gì một application cần consume lại topic message nhiều lần.
Để kiểm tra các thông tin về consumer group, sử dụng kafka-consumer-groups với các options Kafka cung cấp.
List consumer group:
Thêm thêm các thông số của group với option –describe:
Nếu đã terminate toàn bộ các consumer, dòng đầu tiên hiện Consumer group 'my-second-app' has no active members
không có gì lạ. Hãy xem các thông số phía sau:
- Column
TOPIC
là topic name. Lưu ý rằng một consumer group có thể consume message từ nhiều topic khác nhau.- Column
PARTITION
là partition của topic.- Column
CURRENT-OFFSET
là offset đã xử lý cho đến thời điểm hiện tại của partition. Offset = 3 nghĩa là đã xử lý 3 message.- Column
LOG-END-OFFSET
là tổng số lượng message trong partition.- Column
LAG
là số lượng message chưa xử lý. Bằng 0 có nghĩa là đã xử lý hết message.
Produce thêm một vào message đến topic my-first-topic và describe lại my-second-app group, các thông số LOG-END-OFFSET
và LAG
sẽ thay đổi.
Sau đó, tiến hành start consumer trong my-second-app group, nó sẽ consume 3 message bị LAG
:
Nếu coi 1 group là 1 application (Java, Golang, Nodejs… whatever), Kafka đã cung cấp cơ chế để consume lại toàn bộ message topic cho consumer group khi start và thực tế ít khi sử dụng. Do đó chẳng có lý do gì một application cần consume lại topic message nhiều lần.
Chém vậy chứ cũng có những trường hợp consumer group cần consume lại message từ topic, không phải toàn bộ message thì là vài hoặc một message.
Ví dụ như crash application hoặc cần đọc lại vài message trước đó để xử lý business logic.
Resetting offset sẽ giúp chúng ta xử lý vấn đề trên.
Sử dụng các option –reset-offsets với kafka-consumer-groups để reset offset cho 1 hoặc nhiều topic:
Toàn bộ offset của các partition đã được reset về 0. Restart consumer, expect consumer sẽ consume lại tất cả message từ đầu:
Nếu muốn reset offset cho consumer group với toàn bộ topic, sử dụng –all-topics thay vì specific topic name:
Ngoài ra còn nhiều options khác để reset chính xác đến vị trí offset, tiến lên hoặc lùi xuống sử dụng option –shift-by {value}:
- Value > 0 là tiến lên.
- Value < 0 là lùi xuống.
Cơ bản như vậy là đủ dùng, nếu muốn đọc thêm về các options có thể sử dụng –help:
Việc sử dụng Kafka CLI giúp có cái nhìn tổng quan và practice với Kafka core: topic, offset, partition, producer, consumer…
Phần việc sử dụng CLI và monitoring dành cho DevOps hoặc SysAdmin. Chúng ta focus chính vào việc gửi nhận message, define logic code, thỉnh thoảng vào check Kafka UI debug cho vui.
Việc connect thế nào, produce/consume message ra sao có framework lo hết rồi. Nếu sử dụng Spring thì chỉ cần thêm thắt config, add annotation là xong hết. Tuy nhiên cũng chính sự tiện lợi đó sẽ đánh mất cơ hội để chúng ta tìm hiểu sâu và kĩ hơn về những thứ đang làm.
Ngoài Kafka CLI, một tool khác mà DevOps hay sử dụng để monitor hoặc debug Kafka là KafkaCat, các bạn có thể tìm hiểu thêm tại đây.
Một lưu ý nữa, Kafka không cung cấp GUI, dùng terminal cho chuyên nghiệp. Với dân không chuyên như mình thì vẫn khoái dùng UI cho tiện.
- Doanh nghiệp lớn sẽ sử dụng các Enterprise tool như Conduktor hoặc Offset explorer.
- Open-source thì có Kafka UI hoặc Kafdrop.
© Dat Bui