# Spring Boot + Apache Kafka Course - The Practical Guide ###### tags: `Udemy-Java Guides` [Course Link](https://www.udemy.com/course/spring-boot-and-apache-kafka/learn/lecture/32151354#overview) ![](https://i.imgur.com/NqiQhsc.png) ![](https://i.imgur.com/LUHrBle.png) ![](https://i.imgur.com/69X8976.png) ![](https://i.imgur.com/ESxGRiN.png) ![](https://i.imgur.com/r4lFfQ0.png) ![](https://i.imgur.com/u91LBkA.png) ![](https://i.imgur.com/M3kdfaL.png) ![](https://i.imgur.com/AtV3L28.png) ![](https://i.imgur.com/ymdcFYo.png) * start the zookeeper service > C:\kafka_2.13-3.2.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties * start the kafka broker service > C:\kafka_2.13-3.2.1> .\bin\windows\kafka-server-start.bat .\config\server.properties * create a topic to store events > C:\kafka_2.13-3.2.1> .\bin\windows\kafka-topics.bat --create --topic topic-example --bootstrap-server localhost:9092 * write some events into the topic > C:\kafka_2.13-3.2.1> .\bin\windows\kafka-console-producer.bat --topic topic-example --bootstrap-server localhost:9092 > hello world kafka topic event demo 1 * read the events > C:\kafka_2.13-3.2.1> .\bin\windows\kafka-console-consumer.bat --topic topic-example --from-beginning --bootstrap-server localhost:9092 > hello world kafka topic event demo 1 ## Spring Boot + Kafka Producer and Consumer for String message ### Configure Kafka Producer and Consumer ```shell= spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer ``` ![](https://i.imgur.com/Fsy3LMZ.png) ### Create KafkaConfig class ```java= @Configuration public class KafkaTopicConfig { @Bean public NewTopic javaguidesTopic() { return TopicBuilder.name("javaguides").build(); } } ``` ### Create Producer class ```java= @Service @Log4j2 public class KafkaProducer { private KafkaTemplate<String, String> kafkaTemplate; @Autowired public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { log.info("message sent {}", message); kafkaTemplate.send("javaguides", message); } } ``` ### Create Rest API to send message ```java= @RestController @RequestMapping("/api/v1/kafka") public class MessageController { private KafkaProducer kafkaProducer; @Autowired public void setKafkaProducer(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } // http://localhost:8080/api/v1/kafka/publish?message=hello world @GetMapping("/publish") public ResponseEntity<String> publish(@RequestParam("message") String message) { kafkaProducer.sendMessage(message); return ResponseEntity.ok("Message sent to the topic"); } } ``` #### read the events ```sh= .\bin\windows\kafka-console-consumer.bat --topic javaguides --from-beginning --bootstrap-server localhost:9092 ``` ### Create Consumer class ```java= @Service @Log4j2 public class KafkaConsumer { @KafkaListener(topics = "javaguides", groupId = "myGroup") public void consume(String message) { log.info("Message received -> {}", message); } } ``` ## Spring Boot + Kafka Producer and Consumer for JSON message ![](https://i.imgur.com/ou5LCod.png) ### Configure Producer and Consumer for Json Serializer and Deserializer ```shell= spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spinrg.json.trusted.packages=* spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer ``` ### Create Simple Pojo to serialize and deserialize ```java= @Data public class User { private Integer id; private String firstName; private String lastName; } ``` ### Create Kafka Producer to produce JSON message ```java= @Service @Log4j2 public class JsonKafkaProducer { private KafkaTemplate<String, User> kafkaTemplate; @Autowired public void setKafkaTemplate(KafkaTemplate<String, User> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(User user) { log.info("Message sent -> {}", user); Message<User> message = MessageBuilder .withPayload(user) .setHeader(KafkaHeaders.TOPIC, "javaguides") .build(); kafkaTemplate.send(message); } } ``` ### Create Rest API to send Json Object ```java= @RestController @RequestMapping("api/v1/kafka") public class JsonMessageController { private JsonKafkaProducer jsonKafkaProducer; @Autowired public void setJsonKafkaProducer(JsonKafkaProducer jsonKafkaProducer) { this.jsonKafkaProducer = jsonKafkaProducer; } @PostMapping("/publish") public ResponseEntity<String> publish(@RequestBody User user) { jsonKafkaProducer.sendMessage(user); return ResponseEntity.ok("JSON message sent to kafka topic"); } } ``` #### Create new topic for json ```java= @Configuration public class KafkaTopicConfig { @Bean public NewTopic javaguidesTopic() { return TopicBuilder.name("javaguides").build(); } @Bean public NewTopic javaguidesJsonTopic() { return TopicBuilder.name("javaguides_json").build(); } } ``` ### Create Kafka Consumer to consume JSON message ```java= @Service @Log4j2 public class JsonKafkaConsumer { @KafkaListener(topics = "javaguides_json", groupId = "myGroup") public void consume(User user) { log.info("Json message received -> {}", user); } } ``` ### Refactor code to externalize the topic name ## Real world project - Kafka Producer Wikimedia ![](https://i.imgur.com/bBt8tX7.png) ![](https://i.imgur.com/Z91jtnt.png) ### split into multi module * root pom.xml ```xml= <packaging>pom</packaging> ``` * kafka-producer-wikimedia pom.xml ```xml= <packaging>jar</packaging> ``` ```java= @SpringBootApplication public class SpringBootProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootProducerApplication.class); } } ``` ### Configure Wikimedia Producer and Create a topic ```shell= spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer ``` ```java= @Configuration public class KafkaTopicConfig { @Bean public NewTopic topic() { return TopicBuilder.name("wikimedia_recentchange").build(); } } ``` ### Create WikimediaChangeHandler class ```java= @Log4j2 @Component public class WikimediaChangeHandler implements EventHandler { private KafkaTemplate<String, String> kafkaTemplate; private final String topic = "wikimedia_recentchange"; @Autowired public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Override public void onOpen() throws Exception { } @Override public void onClosed() throws Exception { } @Override public void onMessage(String s, MessageEvent messageEvent) throws Exception { log.info("event data -> {}", messageEvent.getData()); kafkaTemplate.send(topic, messageEvent.getData()); } @Override public void onComment(String s) throws Exception { } @Override public void onError(Throwable throwable) { } } ``` ### Create WikiMediaChangesProducer class ```java= @Service @Log4j2 public class WikiMediaChangesProducer { private KafkaTemplate<String, String> kafkaTemplate; private EventHandler eventHandler; @Autowired public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Autowired public void setEventHandler(EventHandler eventHandler) { this.eventHandler = eventHandler; } public void sendMessage() throws InterruptedException { String url = "https://stream.wikimedia.org/v2/stream/recentchange"; // to read real time stream data from wikimedia, we use event source EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url)); EventSource eventSource = builder.build(); eventSource.start(); TimeUnit.MINUTES.sleep(10); } } ``` ### SpringBootProducerApplication implements CommandLineRunner ```java= @SpringBootApplication public class SpringBootProducerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringBootProducerApplication.class); } private WikiMediaChangesProducer wikiMediaChangesProducer; @Autowired public void setWikiMediaChangesProducer(WikiMediaChangesProducer wikiMediaChangesProducer) { this.wikiMediaChangesProducer = wikiMediaChangesProducer; } @Override public void run(String... args) throws Exception { wikiMediaChangesProducer.sendMessage(); } } ``` ## Real world project - Kafka Consumer Database ```xml= <packaging>jar</packaging> ``` ```java= @SpringBootApplication public class SpringBootConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootConsumerApplication.class); } } ``` ### Configure Kafka Consumer in application.properties File ```shell= spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer ``` ### Kafka Consumer Implementation ```java= @Service @Log4j2 public class KafkaDatabaseConsumer { @KafkaListener(topics = "wikimedia_recentchange", groupId = "myGroup") public void consume(String eventMessage) { log.info("Message received -> {}", eventMessage); } } ``` ### Configure MySQL Database > add dependencies in consumer pom.xml ```xml= <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> ``` ```shell= spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia spring.datasource.username=root spring.datasource.password=root spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect spring.jpa.hibernate.ddl-auto=update spring.jpa.properties.hibernate.show_sql=true spring.jpa.properties.hibernate.use_sql_comments=true spring.jpa.properties.hibernate.format_sql=true ``` ### Save Wikimedia Data into MySQL Database ```java= @Entity @Table(name = "wikimedia_recentchange") @Getter @Setter public class WikimediaData { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Lob private String wikiEventData; } ``` ```java= @Repository public interface WikimediaDataRepository extends JpaRepository<WikimediaData, Long> { } ``` ```java= @Service @Log4j2 public class KafkaDatabaseConsumer { private WikimediaDataRepository wikimediaDataRepository; @Autowired public void setWikimediaDataRepository(WikimediaDataRepository wikimediaDataRepository) { this.wikimediaDataRepository = wikimediaDataRepository; } @KafkaListener(topics = "wikimedia_recentchange", groupId = "myGroup") public void consume(String eventMessage) { log.info("Message received -> {}", eventMessage); WikimediaData wikimediaData = new WikimediaData(); wikimediaData.setWikiEventData(eventMessage); wikimediaDataRepository.save(wikimediaData); } } ``` ## Event Driven Architecture ### What is Event Driven Architecture ![](https://i.imgur.com/JSCpzrK.png) ![](https://i.imgur.com/4LmtBvL.png) ### How Event Driven Architecture Works and It's Advantages ![](https://i.imgur.com/9haGlNP.png) ## Event-Driven Microservices Using Spring Boot and Apache Kafka ![](https://i.imgur.com/g915YVK.png) ### Create 4 Microservices - OrderService, StockService, EmailService & Base-Domains > order service, stock service, email service and base domains ### Base-Domains Microservice - Create DTO Classes - Order and OrderEvent ```java= @Data public class Order { private String orderId; private String name; private Integer quantity; private BigDecimal price; } ``` ```java= @Data public class OrderEvent { private String message; private String status; private Order order; } ``` ### OrderService Microservice - Configure Kafka Producer ```shell= spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.topic.name=order_topics ``` ### OrderService Microservice - Configure Kafka Topic ```java= @Configuration public class KafkaTopicConfig { @Value("${spring.kafka.topic.name}") private String topicName; // spring bean for kafka topic @Bean public NewTopic topic() { return TopicBuilder.name(topicName).build(); } } ``` ### OrderService Microservice - Create Kafka Producer ![](https://i.imgur.com/UjK9Lmz.png) > add base-domains project as dependency to order-service project ```xml= <dependency> <groupId>com.example</groupId> <artifactId>base-domains</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> ``` ```java= @Service @Log4j2 public class OrderProducer { private NewTopic topic; private KafkaTemplate<String, OrderEvent> kafkaTemplate; @Autowired public void setTopic(NewTopic topic) { this.topic = topic; } @Autowired public void setKafkaTemplate(KafkaTemplate<String, OrderEvent> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(OrderEvent orderEvent) { log.info("Order event => {}", orderEvent); // create Message Message<OrderEvent> message = MessageBuilder .withPayload(orderEvent) .setHeader(KafkaHeaders.TOPIC, topic.name()) .build(); kafkaTemplate.send(message); } } ``` ### OrderService Microservice - Create REST API to Send Order & Test Kafka Producer ```java= @RestController @RequestMapping("/api/v1") public class OrderController { private OrderProducer orderProducer; @Autowired public void setOrderProducer(OrderProducer orderProducer) { this.orderProducer = orderProducer; } @PostMapping("/orders") public String placeOrder(@RequestBody Order order) { order.setOrderId(UUID.randomUUID().toString()); OrderEvent orderEvent = new OrderEvent(); orderEvent.setStatus("PENDING"); orderEvent.setMessage("order status is pending"); orderEvent.setOrder(order); orderProducer.sendMessage(orderEvent); return "order placed successfully"; } } ``` ### StockService Microservice - Configure Kafka Consumer ![](https://i.imgur.com/xt8Jmf2.png) ```shell= server.port=8081 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=stock spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.topic.name=order_topics ``` ### StockService Microservice - Create Kafka Consumer ```java= @Service @Log4j2 public class OrderConsumer { private OrderLogRepository orderLogRepository; @Autowired public void setOrderLogRepository(OrderLogRepository orderLogRepository) { this.orderLogRepository = orderLogRepository; } @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}") public void consume(OrderEvent orderEvent) { log.info("Order event received in stock service => {}", orderEvent); // save the order event into database OrderLog orderLog = new OrderLog(); orderLog.setMessage(orderEvent.getMessage()); orderLog.setStatus(orderEvent.getStatus()); orderLog.setOrderId(orderEvent.getOrder().getOrderId()); orderLog.setOrderName(orderEvent.getOrder().getName()); orderLog.setOrderQuantity(orderEvent.getOrder().getQuantity()); orderLog.setOrderPrice(orderEvent.getOrder().getPrice()); orderLogRepository.save(orderLog); log.info("order event received => {}", orderEvent); log.info("order log saved => {}", orderLog); } } ``` ```java= @Entity @Table(name = "order_log") @Getter @Setter @ToString public class OrderLog { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column(name = "message") private String message; @Column(name = "status") private String status; @Column(name = "order_id") private String orderId; @Column(name = "order_name") private String orderName; @Column(name = "order_quantity") private Integer orderQuantity; @Column(name = "order_price") private BigDecimal orderPrice; } ``` ```java= @Repository public interface OrderLogRepository extends JpaRepository<OrderLog, Long> { } ``` ```shell= server.port=8081 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=stock spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.topic.name=order_topics spring.datasource.url=jdbc:mysql://localhost:3306/stock spring.datasource.username=root spring.datasource.password=root spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect spring.jpa.hibernate.ddl-auto=update spring.jpa.properties.hibernate.show_sql=true spring.jpa.properties.hibernate.use_sql_comments=true spring.jpa.properties.hibernate.format_sql=true ``` ### EmailService Microservice - Configure and Create Kafka Consumer ```shell= server.port=8082 spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=email spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.topic.name=order_topics ```