# Confluent-Kafka Producer ## 先決條件 1. 請確保 AWS Secrets Manager 已創建相關用戶名與密碼 2. AWS MSK 已開啟 SASL 3. Producer 與 AWS MSK Broker 網域已使用VPN連接 or 位於相同VPC之下 4. security group 已確定對內外開放 AWS MSK SASL身分驗證步驟: https://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-password.htmlg 如果不確定是否符合環境上述4點,可以先用上方官方文件打通EC2->MSK後,再嘗試以下程式步驟 ## Python **相關配置:** Ubuntu 20.0.4 LTS Python 3.8.10 AWS MSK 2.8.1 請先在 AWS MSK 創建 Topic ``` <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --zookeeper ZookeeperConnectString --replication-factor 3 --partitions 1 --topic ExampleTopicName ``` 1. 安裝 confluent-kafka ```pip install confluent-kafka``` 2. Producer 程式碼 **(需更換 servers, sasl.username, sasl.password, topic name)** ``` python from confluent_kafka import Producer from datetime import datetime from time import strftime import json def main(): #SASL Private endpoint (single-VPC) servers = "b-1.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-2.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096" producer = Producer({ 'bootstrap.servers': servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'teddy610',# AWS Secrets Manager of secret username 'sasl.password': 'teddy610',# AWS Secrets Manager of secret password 'sasl.mechanisms': 'SCRAM-SHA-512', 'delivery.timeout.ms': 30000#Time out ms }) count = 1 data = { 'start_time': datetime.now().strftime("%m/%d/%Y %H:%M:%S.%f"), 'value':"haha good", 'order_id': count } producer.produce('test2', json.dumps(data).encode('utf-8'))#test100為topic名稱 請自行更換 #producer.poll(1)#Asynchronous writes producer.flush()#Synchronous writes if __name__=="__main__": main() ``` Confluent-kafka參考: https://docs.confluent.io/kafkaclients/python/current/overview.html ## Java **相關配置:** Ubuntu 20.0.4 LTS Java 1.11.0 AWS MSK 2.8.1 1. 安裝maven ``` sudo apt update sudo apt install maven ``` 2. 創建 java 項目專案 ``` mkdir my-kafka-project cd my-kafka-project ``` 3. 創建一個 Maven Java項目 ``` mvn archetype:generate -DgroupId=com.example.kafka -DartifactId=my-kafka-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false ``` 4. 進入```my-kafka-app```修改 pom.xml檔案 ``` python <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.kafka</groupId> <artifactId>my-kafka-app</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>my-kafka-app</name> <url>http://maven.apache.org</url> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.6</version> <!-- 使用适用的版本 --> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>7.0.1-ccs</version> </dependency> </dependencies> </project> ``` 5. 更新依賴項 ```mvn clean install -U``` 6. 進入 ```/my-kafka-project/my-kafka-app/src/main/java/com/example/kafka```目錄下創建 KafkaProducerExample.java 程式 **KafkaProducerExample.java(需更換 servers, sasl.username, sasl.password, topic name)** ```java package com.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Kafka SASL 地址 String bootstrapServers = "b-1.yenchi0511msk.m124wg.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-2.yenchi0511msk.m124wg.c4.kafka.ap-northeast-1.amazonaws.com:9096"; // Kafka 主題 String topic = "test100"; // SASL 用戶名與密碼 String username = "teddy610"; String password = "teddy610"; // 配置 Kafka 生產者 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 添加 SASL 配置 properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "SCRAM-SHA-512"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); // 創建 Kafka 生產者 Producer<String, String> producer = new KafkaProducer<>(properties); // 發送消息 ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "Hello, Kafka!"); producer.send(record); // 關閉生產者 producer.close(); } } ``` 7. 回到 ```/home/ubuntu/my-kafka-project/my-kafka-app``` 編譯與運行 java 程式 ``` mvn compile mvn exec:java -Dexec.mainClass="com.example.kafka.KafkaProducerExample" ``` Confluent-kafka參考: https://docs.confluent.io/kafka-clients/java/current/overview.html