# Confluent-Kafka Consumer ## 先決條件 1. 請確保 AWS Secrets Manager 已創建相關用戶名與密碼 2. AWS MSK 已開啟 SASL 3. Producer 與 AWS MSK Broker 網域已使用VPN連接 or 位於相同VPC之下 4. security group 已確定對內外開放 AWS MSK SASL身分驗證步驟: https://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-password.htmlg 如果不確定是否符合環境上述4點,可以先用上方官方文件打通EC2->MSK後,再嘗試以下程式步驟 ## Python **相關配置:** Ubuntu 20.0.4 LTS Python 3.8.10 AWS MSK 2.8.1 1. Ubuntu 套件管理 apt 更新 ```sudo apt-get update``` 2. 安裝 docker ```sudo apt-get install -y docker.io``` 3. pull images ```sudo docker pull yenmochi/kafka_consumer``` 4. run continaer ```docker run -it yenmochi/kafka_consumer bash``` 5. 進入程式目錄 ```cd /home``` 6. 執行 python 程式 ``` python3 <SASL/SCRAM server> <username> <password> <group id> <topic name> <runtime> ``` **執行範例: python3 kafka_consumer.py b-2.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096 teddy610 teddy610 aaa test2 5** ## **備註** **1. <SASL/SCRAM server> 是 AWS MSK SASL 的 Bootstrap server** **2. <username> 與 <password>是 AWS Secrets Manager設定的名稱與密碼** **3. <groupid> 是消費者群組(隨便打即可):** **4. <topic> 為 MSK Topic 名稱:** **5. <runtime> 是程式持續時間(秒),設定 10 就只會跑 10 秒 :** ## **python 程式碼** ```python from confluent_kafka import Consumer import sys import json from datetime import datetime from time import strftime import time import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) #consumer 設置 servers = sys.argv[1] username = sys.argv[2] password = sys.argv[3] groupid = sys.argv[4] #topic name topic = [sys.argv[5]] runtime_seconds = sys.argv[6] conf = {'bootstrap.servers': str(servers), 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': str(username), 'sasl.password': str(password), 'group.id': str(groupid), 'auto.offset.reset': 'latest', #'enable.auto.offset.store': False #不要提交偏移量 }#smallest consumer = Consumer(conf) output_file = open(str(hostname) + "-" +ip_address + ".txt", "w") def msg_process(msg): json_msg = json.loads(msg.value().decode('utf-8')) json_msg["end_time"] = datetime.now().strftime("%m/%d/%Y %H:%M:%S.%f") print(json_msg) output_file.write(json.dumps(json_msg) + "\n") def consume_loop(consumer, topics): try: consumer.subscribe(topics) start_time = time.time() while time.time() - start_time < int(runtime_seconds): msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: #如果重啟後也要當前取得最新資訊,則需要刪除這行 consumer.commit(asynchronous=False) msg_process(msg) finally: # Close down consumer to commit final offsets. print("done") output_file.close() consumer.close() consume_loop(consumer,topic) ```