# Confluent-Kafka Producer
## 先決條件
1. 請確保 AWS Secrets Manager 已創建相關用戶名與密碼
2. AWS MSK 已開啟 SASL
3. Producer 與 AWS MSK Broker 網域已使用VPN連接 or 位於相同VPC之下
4. security group 已確定對內外開放
AWS MSK SASL身分驗證步驟:
https://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-password.htmlg
如果不確定是否符合環境上述4點,可以先用上方官方文件打通EC2->MSK後,再嘗試以下程式步驟
## Python
**相關配置:**
Ubuntu 20.0.4 LTS
Python 3.8.10
AWS MSK 2.8.1
請先在 AWS MSK 創建 Topic
```
<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --zookeeper ZookeeperConnectString --replication-factor 3 --partitions 1 --topic ExampleTopicName
```
1. 安裝 confluent-kafka
```pip install confluent-kafka```
2. Producer 程式碼
**(需更換 servers, sasl.username, sasl.password, topic name)**
``` python
from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
def main():
#SASL Private endpoint (single-VPC)
servers = "b-1.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-2.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096"
producer = Producer({
'bootstrap.servers': servers,
'security.protocol': 'SASL_SSL',
'sasl.username': 'teddy610',# AWS Secrets Manager of secret username
'sasl.password': 'teddy610',# AWS Secrets Manager of secret password
'sasl.mechanisms': 'SCRAM-SHA-512',
'delivery.timeout.ms': 30000#Time out ms
})
count = 1
data = {
'start_time': datetime.now().strftime("%m/%d/%Y %H:%M:%S.%f"),
'value':"haha good",
'order_id': count
}
producer.produce('test2', json.dumps(data).encode('utf-8'))#test100為topic名稱 請自行更換
#producer.poll(1)#Asynchronous writes
producer.flush()#Synchronous writes
if __name__=="__main__":
main()
```
Confluent-kafka參考: https://docs.confluent.io/kafkaclients/python/current/overview.html
## Java
**相關配置:**
Ubuntu 20.0.4 LTS
Java 1.11.0
AWS MSK 2.8.1
1. 安裝maven
```
sudo apt update
sudo apt install maven
```
2. 創建 java 項目專案
```
mkdir my-kafka-project
cd my-kafka-project
```
3. 創建一個 Maven Java項目
```
mvn archetype:generate -DgroupId=com.example.kafka -DartifactId=my-kafka-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
```
4. 進入```my-kafka-app```修改 pom.xml檔案
``` python
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.kafka</groupId>
<artifactId>my-kafka-app</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>my-kafka-app</name>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version> <!-- 使用适用的版本 -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>7.0.1-ccs</version>
</dependency>
</dependencies>
</project>
```
5. 更新依賴項
```mvn clean install -U```
6. 進入 ```/my-kafka-project/my-kafka-app/src/main/java/com/example/kafka```目錄下創建 KafkaProducerExample.java 程式
**KafkaProducerExample.java(需更換 servers, sasl.username, sasl.password, topic name)**
```java
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Kafka SASL 地址
String bootstrapServers = "b-1.yenchi0511msk.m124wg.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-2.yenchi0511msk.m124wg.c4.kafka.ap-northeast-1.amazonaws.com:9096";
// Kafka 主題
String topic = "test100";
// SASL 用戶名與密碼
String username = "teddy610";
String password = "teddy610";
// 配置 Kafka 生產者
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 添加 SASL 配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
// 創建 Kafka 生產者
Producer<String, String> producer = new KafkaProducer<>(properties);
// 發送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "Hello, Kafka!");
producer.send(record);
// 關閉生產者
producer.close();
}
}
```
7. 回到 ```/home/ubuntu/my-kafka-project/my-kafka-app```
編譯與運行 java 程式
```
mvn compile
mvn exec:java -Dexec.mainClass="com.example.kafka.KafkaProducerExample"
```
Confluent-kafka參考:
https://docs.confluent.io/kafka-clients/java/current/overview.html