# Spring Boot + Apache Kafka Course - The Practical Guide
###### tags: `Udemy-Java Guides`
[Course Link](https://www.udemy.com/course/spring-boot-and-apache-kafka/learn/lecture/32151354#overview)









* start the zookeeper service
> C:\kafka_2.13-3.2.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
* start the kafka broker service
> C:\kafka_2.13-3.2.1> .\bin\windows\kafka-server-start.bat .\config\server.properties
* create a topic to store events
> C:\kafka_2.13-3.2.1> .\bin\windows\kafka-topics.bat --create --topic topic-example --bootstrap-server localhost:9092
* write some events into the topic
> C:\kafka_2.13-3.2.1> .\bin\windows\kafka-console-producer.bat --topic topic-example --bootstrap-server localhost:9092
> hello world
kafka topic event
demo 1
* read the events
> C:\kafka_2.13-3.2.1> .\bin\windows\kafka-console-consumer.bat --topic topic-example --from-beginning --bootstrap-server localhost:9092
> hello world
kafka topic event
demo 1
## Spring Boot + Kafka Producer and Consumer for String message
### Configure Kafka Producer and Consumer
```shell=
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
```

### Create KafkaConfig class
```java=
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic() {
return TopicBuilder.name("javaguides").build();
}
}
```
### Create Producer class
```java=
@Service
@Log4j2
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
log.info("message sent {}", message);
kafkaTemplate.send("javaguides", message);
}
}
```
### Create Rest API to send message
```java=
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
private KafkaProducer kafkaProducer;
@Autowired
public void setKafkaProducer(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
// http://localhost:8080/api/v1/kafka/publish?message=hello world
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to the topic");
}
}
```
#### read the events
```sh=
.\bin\windows\kafka-console-consumer.bat --topic javaguides --from-beginning --bootstrap-server localhost:9092
```
### Create Consumer class
```java=
@Service
@Log4j2
public class KafkaConsumer {
@KafkaListener(topics = "javaguides", groupId = "myGroup")
public void consume(String message) {
log.info("Message received -> {}", message);
}
}
```
## Spring Boot + Kafka Producer and Consumer for JSON message

### Configure Producer and Consumer for Json Serializer and Deserializer
```shell=
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spinrg.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
```
### Create Simple Pojo to serialize and deserialize
```java=
@Data
public class User {
private Integer id;
private String firstName;
private String lastName;
}
```
### Create Kafka Producer to produce JSON message
```java=
@Service
@Log4j2
public class JsonKafkaProducer {
private KafkaTemplate<String, User> kafkaTemplate;
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(User user) {
log.info("Message sent -> {}", user);
Message<User> message = MessageBuilder
.withPayload(user)
.setHeader(KafkaHeaders.TOPIC, "javaguides")
.build();
kafkaTemplate.send(message);
}
}
```
### Create Rest API to send Json Object
```java=
@RestController
@RequestMapping("api/v1/kafka")
public class JsonMessageController {
private JsonKafkaProducer jsonKafkaProducer;
@Autowired
public void setJsonKafkaProducer(JsonKafkaProducer jsonKafkaProducer) {
this.jsonKafkaProducer = jsonKafkaProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user) {
jsonKafkaProducer.sendMessage(user);
return ResponseEntity.ok("JSON message sent to kafka topic");
}
}
```
#### Create new topic for json
```java=
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic() {
return TopicBuilder.name("javaguides").build();
}
@Bean
public NewTopic javaguidesJsonTopic() {
return TopicBuilder.name("javaguides_json").build();
}
}
```
### Create Kafka Consumer to consume JSON message
```java=
@Service
@Log4j2
public class JsonKafkaConsumer {
@KafkaListener(topics = "javaguides_json", groupId = "myGroup")
public void consume(User user) {
log.info("Json message received -> {}", user);
}
}
```
### Refactor code to externalize the topic name
## Real world project - Kafka Producer Wikimedia


### split into multi module
* root pom.xml
```xml=
<packaging>pom</packaging>
```
* kafka-producer-wikimedia pom.xml
```xml=
<packaging>jar</packaging>
```
```java=
@SpringBootApplication
public class SpringBootProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootProducerApplication.class);
}
}
```
### Configure Wikimedia Producer and Create a topic
```shell=
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
```
```java=
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic topic() {
return TopicBuilder.name("wikimedia_recentchange").build();
}
}
```
### Create WikimediaChangeHandler class
```java=
@Log4j2
@Component
public class WikimediaChangeHandler implements EventHandler {
private KafkaTemplate<String, String> kafkaTemplate;
private final String topic = "wikimedia_recentchange";
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void onOpen() throws Exception {
}
@Override
public void onClosed() throws Exception {
}
@Override
public void onMessage(String s, MessageEvent messageEvent) throws Exception {
log.info("event data -> {}", messageEvent.getData());
kafkaTemplate.send(topic, messageEvent.getData());
}
@Override
public void onComment(String s) throws Exception {
}
@Override
public void onError(Throwable throwable) {
}
}
```
### Create WikiMediaChangesProducer class
```java=
@Service
@Log4j2
public class WikiMediaChangesProducer {
private KafkaTemplate<String, String> kafkaTemplate;
private EventHandler eventHandler;
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Autowired
public void setEventHandler(EventHandler eventHandler) {
this.eventHandler = eventHandler;
}
public void sendMessage() throws InterruptedException {
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
// to read real time stream data from wikimedia, we use event source
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
```
### SpringBootProducerApplication implements CommandLineRunner
```java=
@SpringBootApplication
public class SpringBootProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringBootProducerApplication.class);
}
private WikiMediaChangesProducer wikiMediaChangesProducer;
@Autowired
public void setWikiMediaChangesProducer(WikiMediaChangesProducer wikiMediaChangesProducer) {
this.wikiMediaChangesProducer = wikiMediaChangesProducer;
}
@Override
public void run(String... args) throws Exception {
wikiMediaChangesProducer.sendMessage();
}
}
```
## Real world project - Kafka Consumer Database
```xml=
<packaging>jar</packaging>
```
```java=
@SpringBootApplication
public class SpringBootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootConsumerApplication.class);
}
}
```
### Configure Kafka Consumer in application.properties File
```shell=
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
```
### Kafka Consumer Implementation
```java=
@Service
@Log4j2
public class KafkaDatabaseConsumer {
@KafkaListener(topics = "wikimedia_recentchange", groupId = "myGroup")
public void consume(String eventMessage) {
log.info("Message received -> {}", eventMessage);
}
}
```
### Configure MySQL Database
> add dependencies in consumer pom.xml
```xml=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
```
```shell=
spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
spring.datasource.username=root
spring.datasource.password=root
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true
```
### Save Wikimedia Data into MySQL Database
```java=
@Entity
@Table(name = "wikimedia_recentchange")
@Getter
@Setter
public class WikimediaData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Lob
private String wikiEventData;
}
```
```java=
@Repository
public interface WikimediaDataRepository extends JpaRepository<WikimediaData, Long> {
}
```
```java=
@Service
@Log4j2
public class KafkaDatabaseConsumer {
private WikimediaDataRepository wikimediaDataRepository;
@Autowired
public void setWikimediaDataRepository(WikimediaDataRepository wikimediaDataRepository) {
this.wikimediaDataRepository = wikimediaDataRepository;
}
@KafkaListener(topics = "wikimedia_recentchange", groupId = "myGroup")
public void consume(String eventMessage) {
log.info("Message received -> {}", eventMessage);
WikimediaData wikimediaData = new WikimediaData();
wikimediaData.setWikiEventData(eventMessage);
wikimediaDataRepository.save(wikimediaData);
}
}
```
## Event Driven Architecture
### What is Event Driven Architecture


### How Event Driven Architecture Works and It's Advantages

## Event-Driven Microservices Using Spring Boot and Apache Kafka

### Create 4 Microservices - OrderService, StockService, EmailService & Base-Domains
> order service, stock service, email service and base domains
### Base-Domains Microservice - Create DTO Classes - Order and OrderEvent
```java=
@Data
public class Order {
private String orderId;
private String name;
private Integer quantity;
private BigDecimal price;
}
```
```java=
@Data
public class OrderEvent {
private String message;
private String status;
private Order order;
}
```
### OrderService Microservice - Configure Kafka Producer
```shell=
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.topic.name=order_topics
```
### OrderService Microservice - Configure Kafka Topic
```java=
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}")
private String topicName;
// spring bean for kafka topic
@Bean
public NewTopic topic() {
return TopicBuilder.name(topicName).build();
}
}
```
### OrderService Microservice - Create Kafka Producer

> add base-domains project as dependency to order-service project
```xml=
<dependency>
<groupId>com.example</groupId>
<artifactId>base-domains</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
```
```java=
@Service
@Log4j2
public class OrderProducer {
private NewTopic topic;
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Autowired
public void setTopic(NewTopic topic) {
this.topic = topic;
}
@Autowired
public void setKafkaTemplate(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(OrderEvent orderEvent) {
log.info("Order event => {}", orderEvent);
// create Message
Message<OrderEvent> message = MessageBuilder
.withPayload(orderEvent)
.setHeader(KafkaHeaders.TOPIC, topic.name())
.build();
kafkaTemplate.send(message);
}
}
```
### OrderService Microservice - Create REST API to Send Order & Test Kafka Producer
```java=
@RestController
@RequestMapping("/api/v1")
public class OrderController {
private OrderProducer orderProducer;
@Autowired
public void setOrderProducer(OrderProducer orderProducer) {
this.orderProducer = orderProducer;
}
@PostMapping("/orders")
public String placeOrder(@RequestBody Order order) {
order.setOrderId(UUID.randomUUID().toString());
OrderEvent orderEvent = new OrderEvent();
orderEvent.setStatus("PENDING");
orderEvent.setMessage("order status is pending");
orderEvent.setOrder(order);
orderProducer.sendMessage(orderEvent);
return "order placed successfully";
}
}
```
### StockService Microservice - Configure Kafka Consumer

```shell=
server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=stock
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
```
### StockService Microservice - Create Kafka Consumer
```java=
@Service
@Log4j2
public class OrderConsumer {
private OrderLogRepository orderLogRepository;
@Autowired
public void setOrderLogRepository(OrderLogRepository orderLogRepository) {
this.orderLogRepository = orderLogRepository;
}
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(OrderEvent orderEvent) {
log.info("Order event received in stock service => {}", orderEvent);
// save the order event into database
OrderLog orderLog = new OrderLog();
orderLog.setMessage(orderEvent.getMessage());
orderLog.setStatus(orderEvent.getStatus());
orderLog.setOrderId(orderEvent.getOrder().getOrderId());
orderLog.setOrderName(orderEvent.getOrder().getName());
orderLog.setOrderQuantity(orderEvent.getOrder().getQuantity());
orderLog.setOrderPrice(orderEvent.getOrder().getPrice());
orderLogRepository.save(orderLog);
log.info("order event received => {}", orderEvent);
log.info("order log saved => {}", orderLog);
}
}
```
```java=
@Entity
@Table(name = "order_log")
@Getter
@Setter
@ToString
public class OrderLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "message")
private String message;
@Column(name = "status")
private String status;
@Column(name = "order_id")
private String orderId;
@Column(name = "order_name")
private String orderName;
@Column(name = "order_quantity")
private Integer orderQuantity;
@Column(name = "order_price")
private BigDecimal orderPrice;
}
```
```java=
@Repository
public interface OrderLogRepository extends JpaRepository<OrderLog, Long> {
}
```
```shell=
server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=stock
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
spring.datasource.url=jdbc:mysql://localhost:3306/stock
spring.datasource.username=root
spring.datasource.password=root
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true
```
### EmailService Microservice - Configure and Create Kafka Consumer
```shell=
server.port=8082
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=email
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
```