# Experimental Guide
## Introduction
This experimental guide aims to collect some statistics about the three different tools analysed in this thesis: **Confluent Control Center**, **Kadeck**, and **UI for Kafka**.
In order to create the most faithful and realistic comparasion these three tools will be tested using the same environment. Besides that, they will be tested against a set of tasks: (TODO)
1. Start the application
2. CRUD Kafka topics
3. CRUD schemas (with references)
4. Create messages (also with custom headers) and send them
5. Receive messages (also with custom headers)
6. Authentication
## Use Case: Weather Data Monitoring
To make this experience more realistic and easier to understand, let's think about a use case. Let’s think about a weather data monitoring system that uses Apache Kafka to collect weather data from various sensors and stations and to process it in real-time, providing alerts based on specific weather conditions.
## Confluent Control Center
### Task 1: Start Confluent Control Center with experimental environment
1. Open *Docker Desktop*.
2. Open the [`docker-compose`](#Experimental-Environment) in *Visual Studio Code*.
3. Open a new terminal in *VSCode* by clicking in `Terminal > New Terminal` on the top.
4. Start Confluent Platform by running `docker-compose up -d` on the terminal.
5. Check if the application started succesfully by running `docker-compose ps` on the terminal. It will appear something similar to a table with a column called **STATUS** where all the values should be **"Up"**.
### Task x: Add Schema
> TODO
### Task x: Produce message
Here we are going to produce a message to `weather_data` topic (according to its schema).
1. Go to http://localhost:9021/clusters/gc3UqOebQ9iKGYN7fdUbiw/management/topics/weather_data/message-viewer where the messages receive are shown since this page is opened.
2. Click in ***+ Produce a new message to this topic***.
3. In the value box insert the following example message:
```json!
{
"sensor_id": 1,
"location": "Lisbon, PT",
"temperature": 23,
"humidity": 78,
"wind_speed": 5
}
```
4. Click in ***Produce*** and after a few seconds the message we've just produced will appear below. There is a row in the top left corner that when clicked shows the message details:

### Task x: Produce message with custom headers
As seen in the last task, message details include three fields: **Value**, **Header**, and **Key**. However, when we were producing the example message there were no box relative to the *Headers* field and that's because **Control Center doesn't allow to send headers**. So, in order to produce a message with headers we will need to use [Command Line Interface tools](https://docs.confluent.io/kafka/operations-tools/kafka-tools.html).
1. Open a terminal.
2. Start a terminal session inside the `zookeeper` container by running:
```shell!
docker exec -it zookeeper sh
```
3. Run the following command which will enable headers.
```shell!
kafka-console-producer --bootstrap-server broker:29092\
--topic weather_data\
--property parse.headers=true
```
4. Pass this payload:
```shell!
data_quality:high,data_source:ground_station \t {"sensor_id":2, "location":"Porto, PT", "temperature":18, "humidity":81, "wind_speed":6, "timestamp":"2024-05-13T18:03:41Z"}
```
**Note:** The `\t` in the command above represents the tab key and must be replaced by a "real tab" in the console.
## Kadeck
### Task 1: Start Kadeck with experimental environment
1. Search for *Kadeck* and click on the application icon.
2. Click in ***Add Connection*** in the top right corner.
3. Select *Apache Kafka* and click ***Next***.
4. Fill **Connection Name** with `local`.
5. In *Broker Configuration* tab fill the **Bootstrap Servers** with `localhost:9092`.
6. Change to *Schema Registry* tab and enter `localhost:8081` in **Schema Registry Url**.
> -> Isto Poderá sair porque provavelmente não é necessário]
7. Change to *Kafka Connect* tab and click in ***Add Worker***.
8. Fill the fields in the pop-up with the information below and after that click in ***Save Worker***.
- **Cluster Name:** `KafkaConnect Cluster`
- **URLs:** `localhost:8083`
> ^ A secção daqui de cima
9. Complete this process by clicking in ***Create***.
10. The *Connection* is now created but we still need to connect it. So, in the *Connection Overview* panel click in ***Connect*** in the card relative to the connection we've just created.
### Task x: Add Schema
1. Go to ***Schema Registry*** which is in the vertical left-side panel by clicking in .
2. Click in ***Add Schema***.
3. Set subject name to `WeatherData`.
4. Copy [`WeatherData` Schema](#WeatherData-Schema) and paste it in the *Schema* box.
5. Click in ***Validate***.
6. The schemas table has now a row with the schema you just created. Click in that row and all its details will appear, namely its **ID** which is **1**.
### Task x: Produce message
1. Go to the *Data Browser* by clicking in the left-side panel in .
2. Select the `weather_data` topic in the left-side column.
3. Click in  which will open a pop-up to create new records.
4. In the *Value (string)* box insert the following information:
```json!
{
"sensor_id": 1,
"location": "Lisbon, PT",
"temperature": 23,
"humidity": 78,
"wind_speed": 5
}
```
7. In ***Encoder for Key*** field select `String`.
8. In ***Encoder for Value*** field select `Avro` and it will appear a new field called ***Value Schema Id*** that will make the bridge between the schema we added in the last step and our topic message. So, as the schema had the value **1** as its **ID**, you will insert **1** in this field.
9. Click in ***Confirm*** to produce the message.
10. Click ***Fetch*** to update the records table that now will have a row with the message just produced. Click in that row.
11. In the detail panel, on the right, select the *Key* tab.
12. Click in ***String Decoder*** and after selecting the fields as shown below click ***Confirm***. Now records will be correctly decoded making them readable.

### Task x: Produce message with custom headers
1. Follow the same steps as in the last task but this time with this information:
```json!
{
"sensor_id": 2,
"location": "Porto, PT",
"temperature": 18,
"humidity": 81,
"wind_speed": 6
}
```
3. Expand the ***Show header options*** which will allow you to add custom headers.
4. Add custom headers by filling the `key` and `value` fields (mainting the `string` value) with the informations below and clicking in .
| Key | Value |
| -------- | -------- |
| data_quality | high |
| data_source | ground_station |
3. Click ***Confirm***.
4. Click in ***Fetch*** to update the records table where and select the last produced message.
5. The custom headers will be shown in the message's detail panel, in *Meta* tab.

## Kafka-UI
### Task 1: Start Kafka-UI with experimental environment
1. Open *Docker Desktop*.
2. Open the [`docker-compose`](#Experimental-Environment) in *Visual Studio Code*.
3. Open a new terminal in *VSCode* by clicking in `Terminal > New Terminal` on the top.
4. Start Confluent Platform by running `docker-compose up -d` on the terminal.
5. Check if the application started succesfully by running `docker-compose ps` on the terminal. It will appear something similar to a table with a column called **STATUS** where all the values should be **"Up"**.
6. Go to http://localhost:8080/.
7. In *Cluster name* insert `local`.
8. In *Bootstrap Servers* insert the information bellow.
- **Host:** `localhost`
- **Port:** `9092`
9. Click ***Configure Schema Registry*** and in *URL* insert `localhost:8081`.
10. Click ***Validate*** to check if everything is fine and then click ***Submit*** to create the cluster.
### Task x: Add Schema
## Experimental Environment
### Confluent Control Center
```dockerfile!
---
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.6.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
# Confluent
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# Confluent Control Center/Confluent Platform
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- cc-connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'cc-connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
cc-connect:
image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
hostname: cc-connect
container_name: cc-connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: cc-connect
CONNECT_GROUP_ID: compose-cc-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-cc-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-cc-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-cc-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- cc-connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://cc-connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.6.1
container_name: ksqldb-cli
depends_on:
- broker
- cc-connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.6.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- cc-connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.6.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
```
### Kadeck
Since Kadeck is a desktop application the experimental environment is just relative to Kafka and its dependencies.
```dockerfile!
---
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.6.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
# Confluent
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
# Topics
kafka-init-topics:
image: confluentinc/cp-kafka:7.6.1
depends_on:
- broker
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 30 && \
kafka-topics --create --topic weather_data --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server broker:29092 && \
kafka-topics --create --topic weather_alert --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server broker:29092'"
````
### Kafka-UI
TODO
### Schemas
Some of the tasks require schema definition. Therefore, here are the schemas that will be used in the experiment in order to maintain uniformity through all of this process.
#### `WeatherData` Schema
```json!
{
"namespace": "org.feup.weather.data.avro",
"type": "record",
"name": "WeatherData",
"fields": [
{
"name": "sensor_id",
"type": "int"
},
{
"name": "location",
"type": "string"
},
{
"name": "temperature",
"type": "float"
},
{
"name": "humidity",
"type": "float"
},
{
"name": "wind_speed",
"type": "float"
}
]
}
```
#### `WeatherAlert` Schema
```json!
{
"namespace": "org.feup.weather.alert.avro",
"type": "record",
"name": "WeatherAlert",
"fields": [
{
"name": "alert_id",
"type": "int"
},
{
"name": "alert_type",
"type": {
"type": "enum",
"name": "AlertType",
"symbols": [
"HEATWAVE",
"STORM",
"FLOOD",
"SNOW",
"TORNADO"
]
}
},
{
"name": "location",
"type": "string"
},
{
"name": "description",
"type": "string"
}
]
}
```