# Documentation for Apache Kafka
- In simple terms Apache Kafka is a data streaming platform to handle continous flow of data in real time with fault tolerence.
### Terms Used in Apache kafka :
**Producer:**
- It is the application which sends data to kafka server .
- Data is called as message in Kafka.
- For kafka message is just a array of bytes.
- Kafka can have many producers. This can be setup based on our requirement.
**Consumer:**
- Producer sends data to Kafka server and Consumer will get the data from Kafka server.
- They can get data from more than one producers.
- If consumer goes down for any reason, when it comes back up it will start reading messages from where it last left.
**Broker:**
- The producer and consumer interact using the Broker as an agent.
**Cluster:**
- Group of Brokers form a cluster.
**Topic:**
- A topic is unique name for a data stream.
- It is similar to a category to differentiate between the type of data sent.
- When a data is send by the producer it creates a Topic and all the data is sent under this topic.
- The consumer's subsrcibe to these topics to choose the data they want to recieve.

**Partition:**
- When a Topic has very large amount of Data , the system Kafka is running may not have enough space.
- So the Topic is divided into Partitions to store data in multiple servers.
- The number of partitions of that Topic is decided by us only and distributed to all the partitons.
- That partition cannot be splitted again. Estimate the size before partioning.
- The default number of partions for a Topic is 1.
**Offset :**
- It is the number given to a message in the partition.

- This number are immutable. i.e it cannot be changed.
- Kafka stores message in the order of arrival within a partition.
- Offset number is local to the partition.
- Hence to locate a message we need the following

**Consumer Group:**
- Group of Consumers to speed up the receiving side.
- So if many producers are present and only one consumer is there then it may be heavy load on the consumer side.
- Hence the load is splitted into a many Consumers who work to get the job done faster.

- Maximum number of consumers in group is the total number of partitions you have on a Topic.
- Kafka doesnt allow more than two consumers to read from a same partition simultaeneosly. This restriction is neccesary to avoid double reading of records.
**Zookeeper:**
- Kafka uses Zookeeper to store metadata about the Kafka cluster, as well as consumer client detail.

- By default in linux the data in the kafka server and offset information is stored in **/tmp/kafka-logs/**.
### Setup Apache Kafka :
### Confluent
- Confluent is startup started by the founders of kafka.
- The Confluent Platform is a streaming platform that enables you to organize and manage data from many different sources with one reliable, high performance system.
- Its software confluent platform consists of all the kafka packages and dependencies in one.
### Install Java version 1.8
- Zookeeper requires java version 1.8 to run. Install it using the following command
```
sudo apt-get install openjdk-8-jdk
```
### Install confluent :
1. Install the Confluent public key, which is used to sign the packages in the APT repository.
wget -qO - https://packages.confluent.io/deb/3.3/archive.key | sudo apt-key add -
2. Add the repository to your **/etc/apt/sources.list**
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/3.3 stable main"
3. Run apt-get update and install Confluent Platform. ( Here we are using open source version of Confluent )
sudo apt-get update && sudo apt-get install confluent-platform-oss-2.11
### Running Confluent :
- Start **Confluent** using the following command
confluent start
After start all the services must be up as shown in below image

- Stop **Confluent** using the following command
confluent stop

- Check status of **Confluent** using the following command
confluent status

### Configuration :
- The configuration files for kafka are present in
/etc/kafka
- By default Kafka doesnt allow us to delete topics. We can enable it by uncommenting the following line in **/etc/kafka/server.properties**
delete.topic.enable=true
- **Retention time:**
- The Amount of time kafka stores messages in the kafka server until it is deleted.
- The default amount of time that kafka stores messages is 1 day.
- To change the retention time go to **/etc/kafka/server.properties** and add the following
log.retention.hours=168 ## 7 days
- For more about **retention** and **retention policy** refer to the [later part of the documentation](https://hackmd.io/f9gNe1QfSQqojOENsTRXGw?view#Kafka-retention-policy-).
### Confluent kafka python
- To customize how data is sent or received we can write our own producer or consumer using Confluent's python client **Confluent kafka python**.
- Install **confluent-kafka-python** using the following command
pip install confluent-kafka
- Optional we can install avro to format the messages being sent.
pip install confluent-kafka[avro]
- Install from source from PyPi (requires librdkafka + dependencies to be installed separately):
pip install --no-binary :all: confluent-kafka
#### Github Repo :
Ref - https://github.com/confluentinc/confluent-kafka-python
### Kafka producer example using confluent-kafka-python :
- A producer to send MySql tables using Django ORM.
```
from time import sleep
from json import dumps
from confluent_kafka import Producer
from app.models import UserTable
p = Producer({
'bootstrap.servers': '<ip-address>:<port>'
})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
@api()
def kafka_producer_test(req):
obj = UserTable.objects.all().values()
for message in obj:
p.poll(0)
############
Example
p.produce('topic_name', value ,callback(optional))
############
p.produce('confluent_test', dumps(message), callback=delivery_report)
p.flush()
return obj
```
### Kafka Consumer example using confluent-kafka-python :
```
from json import loads
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': '<ip-address>:<port>',
'group.id': 'mygroup', # Group id is mandatory
'auto.offset.reset': 'earliest'
})
# Topic name to subscribe, can provide multiple topic names.
c.subscribe(['topic_name'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print(loads(msg.value()))
# Instead of printing we can store the messages to our database or do as per requirement.
c.close()
```
- In the above examples where bootstrap.servers is the address and port of the host where kafka is running.
- **auto.offset.reset** is used in the consumer side to change what data we are retreving like from the beginning or only the latest messages in the Topic.
Ref - https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties
### Poll :
- After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked.
- The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.
- Underneath the covers, the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), then no heartbeats will be sent.
- If a period of the configured session timeout elapses before the server has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned. This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions it was assigned (thus preventing active consumers in the group from taking them).
- To stay in the group, you have to prove you are still alive by calling poll.
For more info
Ref:https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
### Kafka retention policy :
- Unlike many other messaging systems, Apache Kafka follows a message retention policy that is independent of its consumers/subscribers.
- Hence, it is very important to configure correct/appropriate retention policy for messages to avoid any data loss.
- For example, If we configure message retention time too less, we run the risk of loosing messages before these have been processed by all of our applications.
- On the other hand, retention time is too high, these messages would be siting useless in Apache partitions consuming system resources.
- For setting up kafka retention policy
Ref - https://www.allprogrammingtutorials.com/tutorials/configuring-messages-retention-time-in-kafka.php
- Kafka will delete the topic only after retention time
Ref - https://stackoverflow.com/questions/23976670/when-how-does-a-topic-marked-for-deletion-get-finally-removed
### Kafka Options explained :
- **auto.create.topics.enable** - The default Kafka configuration specifies that the broker should automatically create a topic under the following circumstances:
- When a producer starts writing messages to the topic
- When a consumer starts reading messages from the topic
- When any client requests metadata for the topic
- **enable.auto.commit** - The default value of true . The consumer's offset will be periodically committed in the background.
- **auto.offset.reset**
- This is used only when there is no previous offeset is present for the consumer.
- When there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted) then we can use the following options.
- **earliest:** automatically reset the offset to the earliest offset
- **latest:** automatically reset the offset to the latest offset
- **none:** throw exception to the consumer if no previous offset is found for the consumer's group
- **anything else:** throw exception to the consumer.
- **group.id-**
- A unique string that identifies the consumer group this consumer belongs to.
### Deleting a Topic :
- A Topic can be added for deletion using the following command
/usr/bin/kafka-topics --zookeeper localhost:2181 --delete --topic <topic_name>
- The Topic will not be deleted immedietly. For more info :
Ref : https://stackoverflow.com/questions/23976670/when-how-does-a-topic-marked-for-deletion-get-finally-removed
- To remove it manually
Ref : https://medium.com/@contactsunny/manually-delete-apache-kafka-topics-424c7e016ff3
### Get same messages from a topic in multiple consumers :
- When there is only one consumer and it is subscribed to Topic it receives all the messages regarding that topic.
- To reduce the load on a single consumer we can use Partitions and Consumer Groups.
- When many consumers are present in a consumer group , each consumer can be assigned a partition to speed up the process.
- But if we want same data in multiple consumers ( like in our case sync collection data to multiple setups ) we need to create a different consumer groups.
- They can created easily at the consumer side. Ref below example.
- Lets assume both consumers are listening to the same topic.
**Consumer 1 is listening for messages in server A and belongs to Group1**
```
c1 = Consumer({
'bootstrap.servers': '<hostname>:<port>',
'group.id': 'group1',
'auto.offset.reset': 'earliest'
})
```
**Consumer 2 is listening for messages in server B and belongs to Group2**
```
### In another remote host another consumer belongs to group 2
c2 = Consumer({
'bootstrap.servers': '<hostname>:<port>',
'group.id': 'group2',
'auto.offset.reset': 'earliest'
})
```
**Note : If Both the consumers are given same group id than only one of them will receive the messages the consumer will be idle. To avoid this we can use different consumer groups as shown in the above example.**
### Useful Commands
**Note** : If the below commands fail with port already in use error please ref to
https://hackmd.io/f9gNe1QfSQqojOENsTRXGw?both#Issues-when-JMX-enabled-
- To check the list of available topics
/usr/bin/kafka-topics --list --zookeeper localhost:2181
- To create a topic
/usr/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
- To delete a topic
usr/bin/kafka-topics. --zookeeper localhost:2181 --delete --topic topic_name
- Command to list the number of brokers
/usr/bin/zookeeper-shell localhost:2181 <<< "ls /brokers/ids"
- Command to get information of a particular Topic
/usr/bin/kafka-topics --describe --zookeeper localhost:2181 --topic confluent_test
- To get a list of all the consumers
/usr/bin/kafka-consumer-groups --list --bootstrap-server localhost:9092
- To get info about a particular consumer
/usr/bin/kafka-consumer-groups --describe --bootstrap-server 51.79.19.94:9092 --describe --group mygroup
- Good resource for using regex in consumer
Ref : https://kafka-python.readthedocs.io/en/master/usage.html
### Issues when JMX enabled :
- Tools such as kafka-topics and kafka-console-producer fail when JMX is enabled. This is caused because of the JMX_PORT environment variable
For more info
Ref : https://github.com/wurstmeister/kafka-docker/wiki#why-do-kafka-tools-fail-when-jmx-is-enabled
### Best References and Resources:
- **To understand Kafka Architecture**
Ref : https://www.tutorialkart.com/apache-kafka/apache-kafka-architecture/
Ref: https://insidebigdata.com/2018/04/12/developing-deeper-understanding-apache-kafka-architecture/
- **To setup producer and consumers**
Ref : https://www.tutorialkart.com/apache-kafka/kafka-connector-mysql-jdbc/
- **To Configure how the data is sent in producer side and where the data is stored in Consumer side in Python**
Ref : https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1
- **Configuration Options for producer and consumer**
Ref : https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties
- **Complete Kafka Documentation**
Ref : http://kafka.apache.org/documentation.html
- **Brief Introduction of how to setup multiple brokers in a cluster**
Ref : https://itnext.io/creating-a-kafka-topic-running-kafka-brokers-d9aab5530fb2
- **Good resource to setup Multi broker cluster**
Ref : http://www.techburps.com/misc/multi-broker-apache-kafka-cluster-setup/64