# Confluent-Kafka Consumer
## 先決條件
1. 請確保 AWS Secrets Manager 已創建相關用戶名與密碼
2. AWS MSK 已開啟 SASL
3. Producer 與 AWS MSK Broker 網域已使用VPN連接 or 位於相同VPC之下
4. security group 已確定對內外開放
AWS MSK SASL身分驗證步驟:
https://docs.aws.amazon.com/zh_tw/msk/latest/developerguide/msk-password.htmlg
如果不確定是否符合環境上述4點,可以先用上方官方文件打通EC2->MSK後,再嘗試以下程式步驟
## Python
**相關配置:**
Ubuntu 20.0.4 LTS
Python 3.8.10
AWS MSK 2.8.1
1. Ubuntu 套件管理 apt 更新
```sudo apt-get update```
2. 安裝 docker
```sudo apt-get install -y docker.io```
3. pull images
```sudo docker pull yenmochi/kafka_consumer```
4. run continaer
```docker run -it yenmochi/kafka_consumer bash```
5. 進入程式目錄
```cd /home```
6. 執行 python 程式
```
python3 <SASL/SCRAM server> <username> <password> <group id> <topic name> <runtime>
```
**執行範例:
python3 kafka_consumer.py b-2.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.yenchi0511msk.cme5as.c4.kafka.ap-northeast-1.amazonaws.com:9096 teddy610 teddy610 aaa test2 5**
## **備註**
**1. <SASL/SCRAM server> 是 AWS MSK SASL 的 Bootstrap server**
**2. <username> 與 <password>是 AWS Secrets Manager設定的名稱與密碼**
**3. <groupid> 是消費者群組(隨便打即可):**
**4. <topic> 為 MSK Topic 名稱:**
**5. <runtime> 是程式持續時間(秒),設定 10 就只會跑 10 秒 :**
## **python 程式碼**
```python
from confluent_kafka import Consumer
import sys
import json
from datetime import datetime
from time import strftime
import time
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
#consumer 設置
servers = sys.argv[1]
username = sys.argv[2]
password = sys.argv[3]
groupid = sys.argv[4]
#topic name
topic = [sys.argv[5]]
runtime_seconds = sys.argv[6]
conf = {'bootstrap.servers': str(servers),
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': str(username),
'sasl.password': str(password),
'group.id': str(groupid),
'auto.offset.reset': 'latest',
#'enable.auto.offset.store': False #不要提交偏移量
}#smallest
consumer = Consumer(conf)
output_file = open(str(hostname) + "-" +ip_address + ".txt", "w")
def msg_process(msg):
json_msg = json.loads(msg.value().decode('utf-8'))
json_msg["end_time"] = datetime.now().strftime("%m/%d/%Y %H:%M:%S.%f")
print(json_msg)
output_file.write(json.dumps(json_msg) + "\n")
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
start_time = time.time()
while time.time() - start_time < int(runtime_seconds):
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
#如果重啟後也要當前取得最新資訊,則需要刪除這行
consumer.commit(asynchronous=False)
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
print("done")
output_file.close()
consumer.close()
consume_loop(consumer,topic)
```