# Big Data (UE21CS343AB2) - YAKt (Yet Another KRaft) ## Introduction - Yet Another KRaft (YAKRaft) is one of projects as a part of the Big Data Course (UE21CS343AB) at PES University - This project involves the students developing the KRaft protocol from scratch, managing metadata for the Kafka system. ### About KRaft - KRaft is a event based, distributed metadata management system that was written to replace Zookeeper in the ecosystem of Kafka. - It uses Raft as an underlying consensus algorithm to do log replication and manage consistency of state. ## Project Objectives and Outcomes Gain a deeper understanding of - The Raft Consensus algorithm and concepts of Leader Election, Log Replication, Fault Tolerance - ZAB Consensus algorithm - Kraft and Zookeeper Architecture and its nuances - Event driven Architecture - Kafka Architecture Outcomes - A robust system that mimics the working of KRaft - A project with a real world use-case ## Technologies/Languages to be used - You are free to use any language such as Python, Go, Java, among others. - Ensure that the chosen language supports all the required functionality. - You are allowed to use any external libraries or APIs if required. - NOTE : You cannot demo KRaft (Original) as your project --- ## Project Specification ![](https://hackmd.io/_uploads/S1B6wXAM6.png) ### Raft Node Must handle the following (going above and beyond these are upto the developer) - Leader election : selecting the leader for the KRaft cluster - Event driven architecture - Eventual Consistency - Failover management : must be able to provide standard raft failover guarantees (3 node cluster can handle single failure; 5 node cluster can handle 2 failures;) - Maintaining event log : event log of all changes being made (this can be used to reconstruct the metadata store) - Snapshotting : - Creation and retrieval of the snapshots - Take periodic snapshots of the event log at the leader to be able to provide a level of fault tolerance NOTE : Most of the above are inherently supported by raft as a consensus algorithm; ## Metadata Storage ### Records Types For the scope of this project add some limitations/modifications to the original records (as seen in reference), - Record types - Record structure Ref : [KRaft Metadata Records](https://cwiki.apache.org/confluence/display/KAFKA/KIP-746%3A+Revise+KRaft+Metadata+Records) 1) RegisterBrokerRecord ```json { "type": "metadata", "name": "RegisterBrokerRecord", "fields": { "internalUUID": "", // type: string; set by the server "brokerId": 0, // type: int; given by client, through config "brokerHost": "", // type: string; given by client, through config "brokerPort": "", // type: string; given by client, through config "securityProtocol": "", // type: string; given by client, through config "brokerStatus": "", // type: string; set by server, can be set to default value "ALIVE" "rackId": "", // type: string; given by client, through config "epoch": 0 // type: int; epoch number given by the quorum controller; initialized as 0 when registered for first time; must increment when there are updates; }, "timestamp": "" } ``` 2) TopicRecord ```json { "type": "metadata", "name": "TopicRecord", "fields": { "topicUUID": "", // type: string; must be set by server; "name": "" // type: string; given by client, through config }, "timestamp": "" // type: timestamp } ``` 3) PartitionRecord ```json { "type": "metadata", "name": "PartitionRecord", "fields": { "partitionId": 0, // type: int; given by client "topicUUID": "", // type: string; given by client "replicas": [], // type: []int; list of broker IDs with replicas; given by client, through config "ISR": [], // type: []int; list of insync broker ids; given by client, through config "removingReplicas": [], // type: []int; list of replicas in process of removal; set as [] by default; updated by the server but not in scope of the project; "addingReplicas": [], // type: []int; list of replicas in the process of addition; set as [] by default; updated by the server but not in scope of the project; "leader": "", // type: string; uuid of broker who is leader for partition; given by client "partitionEpoch": 0 // type: int; number that incrementatlly changes with changes made to partition; set to 0 by default; updated by server when there are updates but not in scope of the project; }, "timestamp": "" // type: timestamp } ``` 4) ProducerIdsRecord ```json { "type": "metadata", "name": "ProducerIdsRecord", "fields": { "brokerId": "", // type : string/int; uuid of requesting broker; given by client "brokerEpoch": 0, // type : int; the epoch at which broker requested; set to broker epochl "producerId": 0 // type : int; producer id requested; given by client }, "timestamp": "" // type: timestamp } ``` 5) BrokerRegistrationChangeBrokerRecord ```json { "type": "metadata", "name": "RegistrationChangeBrokerRecord", "fields": { "brokerId": "", // type: string; given by client "brokerHost": "", // type: string; given by client "brokerPort": "", // type: string; given by client "securityProtocol": "", // type: string; given by client "brokerStatus": "", // type: string; given by client "epoch": 0 // type: int; set to broker epoch after update }, "timestamp": "" // type: timestamp } ``` Broker Statuses : - INIT - FENCED - UNFENCED - ALIVE - CLOSED #### Storage Data Structure - Implementation of how these records are stored is upto the developers; as long as it can do the below functions Example/Suggested (key-value store) : ```json { "RegisterBrokerRecords": { "records": [], // type : []RegisterBrokerRecord "timestamp": "", // type : timestamp; updated time ... // feel free to add any auxillary fields to help with the functionality }, ... } ``` ## HTTP Server #### CRD API - Must include Create, Read, Delete APIs for the Record Type mentioned above - Create : creation of records, returning unique ID of the entity if available - Read : return single/array of records - Delete : deletion; returns unique ID of entity (only for brokers); update status as required; ### Broker Management API Requests made by the Brokers #### API Spec RegisterBroker API (POST Request) - The first time the broker comes online, it registers itself with KRaft - Add required MetadataRecords Accordingly RegisterBrokerChange API (POST Request) - Any changes to be made to the Broker info is sent to this endpoint MetadataFetch API (POST Request) - Acts as the heartbeat mechanism for a broker - Broker sends the last offset of metadata that the broker is aware of (Last offset can be considered as timestamp) - Server must reply back with - All changes to the metadata since last offset - If offset is too far behind latest, send the entire snapshot (most recent snapshot) of metadata ### Client Management API Requests made by the Consumers and Producers #### API Spec RegisterProducer API (POST Request) - Register the Producer MetadataFetch API (GET Request) - Fetch Topics, Partition and Broker Info Records ## Endpoints These are the endpoints that are to be made. Going above and beyond these is upto the developer - RegisterBrokerRecord - register - get all active brokers - get broker by id - TopicRecord - create topic - get topic by name - PartitionRecord - create partition - ProducerIdsRecord - register a producer from a broker - BrokerRegistrationChangeBrokerRecord - updates to broker; increment the epoch on changes; - unregister a broker - BrokerMgmt - a route takes previous offset/timestamp and returns metadata updates since then / if later than 10 minutes send entire snapshot; send diff of all metadata that has been updated - ClientMgmt - a route takes previous offset/timestamp and returns metadata updates since then / if later than 10 minutes send entire snapshot; send only topics, partitions and broker info ## Tips 1) To demo multiple nodes the following approaches can be done (not an exhaustive list) - Using multiple processes on different computers - Using multiple processes on same computer - Spawn multiple nodes on different VMs 2) Can use an implementation of raft in the language of choice to back your KRaft implementation ## Weekly Guidelines This is merely an ideal/suggested timeline for the project. ### Week 1 - Read the raft paper and understand the nuances - Read about KRaft and Zookeeper - Pick a language - Setting up project - Setup a simple HTTP Server - Figure out the technologies/libraries/frameworks to be used ### Week 2 - Setup raft node - Chalk out the endpoints required ### Week 3 - Setup the HTTP endpoints as per requirements - Test ## References [KIP-500 Replace Zookeeper with KRaft](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum) [Working of Zookeeper](https://zookeeper.apache.org/doc/r3.5.4-beta/zookeeperOver.html) [KRaft Metadata Records](https://cwiki.apache.org/confluence/display/KAFKA/KIP-746%3A+Revise+KRaft+Metadata+Records#KIP746:ReviseKRaftMetadataRecords-BrokerRegistrationChangeRecord) [Confluent : Intro to KRaft (Suggested Read)](https://www.slideshare.net/HostedbyConfluent/introducing-kraft-kafka-without-zookeeper-with-colin-mccabe-current-2022) [Hussein Nasser : How does Kafka work?](https://www.youtube.com/watch?v=LN_HcJVbySw) [What is Zookeeper and how is it working with Apache Kafka?](https://www.youtube.com/watch?v=t0FDmj4kaIg) [Martin Kleppman : Consensus](https://www.youtube.com/watch?v=rN6ma561tak&pp=ygUNemFiIGNvbnNlbnN1cw%3D%3D) [Zookeeper Explained](https://www.youtube.com/watch?v=gZj16chk0Ss) [Raft Visualization](https://thesecretlivesofdata.com/raft/) [Raft Paper & More](https://raft.github.io/) [Core Dump : Understanding Raft](https://www.youtube.com/watch?v=IujMVjKvWP4&pp=ygUNemFiIGNvbnNlbnN1cw%3D%3D)