# 工業控制系統中惡意流量檢測與流量移轉 ## 說明 ### 情境 在真實的鐵路工控環境中設有機器學習異常流量檢測機制,當發現為惡意使用者時,會針對該使用者獨立開啟一個新的容器,攻擊者不會察覺自己的流量被移轉到非正式的環境,後續可以針對新產生的容器日誌進行惡意行為分析 ### 環境 1.在 ubuntu (一)上以 docker 架設 conpot 容器,conpot 容器作為合法的鐵路環境 2.在同一台 ubuntu (一)機器上,若偵測到惡意使用者IP時,會動態產生以IP命名的新容器 3.另外一台 ubuntu (二)作為 grafana 日誌管理平台 ## 架構圖 ![截圖 2025-05-03 上午10.45.10](https://hackmd.io/_uploads/H1xUmWXlxl.png) ![E6C28C78-2661-4E87-8ECD-1C2F98385BC8](https://hackmd.io/_uploads/H1VUkDIexx.jpg) --- # 工控基本設備 | 項目 | PLC (可程式邏輯控制器) | SCADA (監控與數據採集系統) | HMI (人機介面) | | -------- | ------------------------ | ----------------------- | --------------- | | **主要功能** | 控制現場設備的運行 | 監控、收集來自 PLC 的數據、報警、數據分析 | 用戶與控制系統的交互界面 | | **用途** | 自動化控制設備 | 數據監控、報告與管理系統 | 顯示設備狀態,接受用戶命令 | | **系統層次** | 基層設備控制 | 高層次監控系統 | 用戶操作界面 | * 本專題模擬三個 PLC 設備 - 信號燈 - 平交道控制器 - 道岔控制器 * 本專題 HMI 介面使用 FUXA # 環境模擬 ## Slave ID 與 Address差異 | 項目 | Slave ID | Address(資料位址) | | ---- | -------------------- | ------------------------ | | 定義 | 指定哪一台從站設備 | 指定該設備內的哪個資料欄位 | | 功能層級 | 封包中的「設備位址」欄位 | 封包中的「資料欄位」 | | 應用 | 跟哪個設備通訊 | 要讀寫設備中的哪一個 Coil/Register | | 範圍 | 1~247(RTU),0 是廣播(選用) | 通常是 0~65535,依資料類型而定 | ## 模擬真實火車控制環境設定 在本專題中,為了模擬真實鐵路工控場域,在 Conpot 中針對 `modbus.xml` 進行自訂,設置了三大類常見的鐵道設備控制項目: 1. 火車信號燈控制 (Slave ID 3) * 透過 Coils 區域模擬紅燈與綠燈的狀態切換。 * 紅燈(`trainSignalRedLight`)位於地址 1,綠燈(`trainSignalGreenLight`)位於地址 2。 2. 平交道控制器 (Slave ID 4) * 以單一 Coil 控制平交道柵欄開關狀態。 * 對應地址為 3,變數名稱為 `railwayCrossingBarrier`。 3. 道岔控制器 (Slave ID 5) * 模擬鐵軌轉轍器(道岔)的位置切換。 使用一個 Coil 在地址 4,變數名稱為 `switchTrackPosition`。 ## 模擬數值變化設定 在 `template.xml` 中,為了使模擬環境更加貼近真實工控系統隨機變化的特性,我針對各項控制設備的狀態,配置了隨機變化的模板設定: * 火車信號燈控制 - `trainSignalRedLight`、`trainSignalGreenLight` - 狀態以 隨機生成 0 或 1 的方式模擬紅燈/綠燈的開關變化。 - 0 代表燈號關閉,1 代表燈號開啟。 * 平交道控制器 - `railwayCrossingBarrier` - 隨機產生 0 或 1,模擬平交道柵欄的啟閉狀態。 - 0 表示開啟(柵欄升起),1 表示關閉(柵欄降下)。 * 道岔控制器 - `switchTrackPosition` - 同樣以 0 或 1 的隨機數值模擬鐵軌轉向情境。 - 0 代表直行軌道,1 代表切換至側線。 ### 補充說明 * 本設計遵循了工控常見的 Modbus 通訊協定,以 Coils (0x) 類型模擬開關型裝置(即只需判斷開/關兩種狀態)。 * 針對每一類設備,都以獨立的 Slave ID 模擬不同控制器,貼近實際 ICS (Industrial Control Systems) 環境中多元設備分散部署的特性。 * 後續攻擊者若針對特定 Modbus 地址範圍進行掃描或操作,即可誘發異常行為,進而被本系統檢測並隔離。 ## mbtget 工具 mbtget 是Linux/Unix 系統上常見的一個工具,用於 Modbus 通訊協定中的命令行工具,用於從 Modbus 設備讀取數據。 以我們的環境為例子: 將紅燈打開 ``` mbtget -u 3 -a 1 -w5 1 -p 5020 192.168.14.149 ``` 短時間快速傳送封包 ``` for i in {1..10}; do mbtget -u 3 -a 1 -w5 1 -p 5020 192.168.14.149; sleep 0.5; done ``` * -u 3:目標 Slave ID。 * -a 1:Modbus 地址: 1(紅燈)/2 (綠燈) * -w5 1:寫入值 1,表示開啟 / 寫入值 0,表示關閉。 * -p 5020:使用自訂埠號 5020。 --- # HMI介面 FUXA 是一個開源的 SCADA/HMI Web 應用程式,用來監控與控制工業自動化設備。它可以透過網頁介面建立與操作監控畫面(如儀表板、動畫、報警、數據紀錄等),支援多種通訊協定(如 Modbus、OPC UA、MQTT)。 示意圖: ![FUXA HMI](https://hackmd.io/_uploads/H1YR3MDxlg.png) | PLC名稱 | Slave ID | Starting Address (位址) | | :------------------- | :------- | :-------------------- | | PLC-1-traffic-light | 3 | 1、2 | | PLC-2-level-crossing | 4 | 3 | | PLC-2-turnout | 5 | 4 | ## 動態產生的FUXA與conpot連線 複製新 FUXA 的 image 檔案 ``` docker commit thirsty_varahamihira new_fuxa ``` 創立新的 network ``` docker network create --driver bridge isolation ``` 確認成功創建 ``` docker network ls ``` 啟動新FUXA容器 ``` docker run -d --name new_fuxa_container --network isolation -p 1882:1881 new_fuxa ``` --- # 異常偵測 ## 創建 `attacker_ips` 名單 創建這個名單,將各種偵測方式判斷出的異常IP統一集中在此,後續可以讓腳本自動偵測此名單內的IP來自動啟動新容器 ```bash sudo ipset create attacker_ips hash:ip ``` - 這個 `attacker_ips` 集合會存放 **攻擊者的 IP** 查看 ipset 名單 ``` sudo ipset list attacker_ips ``` ![ipset](https://hackmd.io/_uploads/HJcreDIell.png) ## 三種異常偵測方法 ### (1)iptables iptables為Linux系統中的防火牆,可以藉此來達到異常偵測 範例: ``` sudo iptables -I DOCKER-USER 1 -p tcp --dport 5020 -m recent --set --name MODBUS_ATTACK sudo iptables -I DOCKER-USER 2 -p tcp --dport 5020 -m recent --update --seconds 10 --hitcount 5 --name MODBUS_ATTACK -j SET --add-set attacker_ips src sudo iptables -A DOCKER-USER -j RETURN # 確保其他流量繼續處理 ``` 1. 第一條規則 * 當有新的 TCP 連接嘗試訪問 5020 端口時,將來源 IP 記錄到 `MODBUS_ATTACK` 列表,並標記時間戳 2. 第二條規則 * 監控 `MODBUS_ATTACK` 列表中的 IP,如果某個 IP 在 60 秒內超過 5 次連接嘗試,就將其加入 `attacker_ips` 集合。 ### (2)gemini API 將流量轉成文字格式後,交給 LLM(大語言模型)分析,根據語意判斷是否為攻擊、類型、來源 IP。 * 前置作業 - gemini API KEY - 安裝 google-generativeai `pip install google-generativeai` 1. 錄製流量 ``` def capture_traffic(interface, duration, output_pcap): """使用 Wireshark 監控網路流量並儲存為 pcap 檔案。""" try: subprocess.run(["tshark", "-i", interface, "-a", f"duration:{duration}", "-w", output_pcap], check=True) print(f"流量監控完成,檔案儲存於:{output_pcap}") return True except subprocess.CalledProcessError as e: print(f"流量監控失敗:{e}") return False ``` 2. 轉換.pcap檔成.txt ``` def pcap_to_text(pcap_file, text_file): """將 pcap 檔案轉換為純文字檔案。""" try: with open(text_file, 'w') as f: subprocess.run([ "tshark", "-r", pcap_file, "-T", "fields", "-e", "frame.number", "-e", "frame.time", "-e", "ip.src", "-e", "ip.dst", "-e", "frame.protocols", "-e", "frame.len", "-e", "tcp.srcport", "-e", "tcp.dstport", "-e", "tcp.flags", "-e", "udp.srcport", "-e", "udp.dstport" ], stdout=f, check=True) print(f"pcap 檔案轉換完成,檔案儲存於:{text_file}") return True except subprocess.CalledProcessError as e: print(f"pcap 檔案轉換失敗:{e}") return False ``` - 說明 ::: spoiler * frame.number:封包編號。 * frame.time:封包時間。 * ip.src:來源 IP 位址。 * ip.dst:目的 IP 位址。 * frame.protocols:封包協定。 * frame.len:封包長度。 * tcp.srcport:TCP 來源埠號。 * tcp.dstport:TCP 目的埠號。 * tcp.flags:TCP 旗標。 * udp.srcport:UDP 來源埠號。 * udp.dstport:UDP 目的埠號。 ::: 3. 選擇模型 * 可以參考[這邊](**https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models**) 4. 輸入prompt 個人認為`只需要回答我這三個問題,不需要回答其他解釋`很重要,否則他會跟你阿哩阿扎說很多解釋,盡可能標準化輸出結果 ``` prompt = f""" 根據以下的輸出,回答我以下3個問題,只需要回答我這三個問題,不需要回答其他解釋 1.是否為攻擊行為(ping請判定為攻擊行為)?(回答是/否) 2.是哪一種攻擊(若1為否則回答無攻擊) 3.攻擊者IP {text_data} """ ``` ### (3)Machine Learning 自行訓練的模型,用於分析流量模式(例如特徵向量、時間序列)判斷異常。 * 透過滑動時間窗口(近 30 秒內)收集封包。 * 把封包特徵(像protocol、function code)轉成一組文字 token。 * 用事先訓練好的 Word2Vec 模型把這組 token 轉成向量。 * 計算這組向量跟「正常行為資料庫」的相似度。 * 如果相似度低於某個閾值(0.93),就判定為異常,並且把該來源 IP 加到 ipset 黑名單。 ![擷取](https://hackmd.io/_uploads/BkSHkvIllx.png) ## 三個檢測方法之優先順序 1. **iptables** : **最快速、最低誤判率**:很適合當第一層防禦,只要超過連線次數就馬上抓,不需要分析、延遲極低。作為初步擋住明顯的暴力攻擊。 2. **機器學習模型** : **適中速度,有泛化能力**:可以偵測到比較隱藏的異常模式(像是慢速掃描、微量滲透),但要注意模型誤判(False Positive/False Negative)問題。 3. **Gemini API** : **精緻但延遲高**:語言模型的分析比較細膩,可以辨識複雜攻擊行為,但是轉檔、丟API、回應有延遲,不適合第一時間阻擋。比較適合做「**第二層確認**」或「**事件深度分析**」。 **多層次防禦流程** 1. 先用 iptables 快速擋住 70% 的明顯攻擊。 1. 剩下進來的流量,再丟給 **機器學習模型** 判別。 1. 如果機器學習模型標記為可疑,平行開啟 Gemini分析做進一步判讀、分類、紀錄證據。 --- # 流量移轉 ## (1)新容器與合法環境的一致性 為了讓攻擊無法察覺自己的流量被移轉,必須產生一個與合法環境一模一樣的環境 **透過 docker commit,將合法環境保存為新的映像 conpot_clean** ``` docker commit legit_conpot conpot_clean ``` * `legal_conpot`為合法環境 * `conpot_clearn`為新的映像檔 --- ## (2)流量移轉--iptables 透過iptables來寫規則,將異常的IP記錄在先前創立的`attacker_ips` ``` sudo iptables -I DOCKER-USER 1 -p tcp --dport 5020 -m recent --set --name MODBUS_ATTACK sudo iptables -I DOCKER-USER 2 -p tcp --dport 5020 -m recent --update --seconds 10 --hitcount 5 --name MODBUS_ATTACK -j SET --add-set attacker_ips src sudo iptables -A DOCKER-USER -j RETURN ``` * 規則一:當有新的 TCP 連接嘗試訪問 5020 端口時,將來源 IP 記錄到 `MODBUS_ATTACK` 列表,並標記時間戳 * 規則二:監控 `MODBUS_ATTACK` 列表中的 IP,如果某個 IP 在 60 秒內超過 5 次連接嘗試,就將其加入 `attacker_ips` 集合。 ### 封包處理流程 查看iptables規則 ``` sudo iptables -t nat -L -n -v ``` 當封包從外部(例如 Kali 的 192.168.14.139)發送到你的主機(192.168.14.149:5020)時,經過以下階段: **1. 進入系統** * 封包首先到達 PREROUTING 鏈(nat 表)。 * 在這裡,Docker 的 NAT 規則(DOCKER 鏈)會檢查是否需要將封包轉換為容器地址(例如 172.17.0.2:5020)。 * 例如: ``` DNAT tcp -- !docker0 * 0.0.0.0/0 0.0.0.0/0 tcp dpt:5020 to:172.17.0.2:5020 ``` **2. 路由決定** * 系統決定封包是發往本機(INPUT)還是轉發(FORWARD)。 * 由於 Docker 使用 NAT,封包的目的地址被改為 172.17.0.2:5020,路由會選擇 FORWARD **3. 轉發階段** * 封包進入 FORWARD 鏈(filter 表) * 在這裡,Docker 設置了規則: ``` DOCKER-USER 0 -- * * 0.0.0.0/0 0.0.0.0/0 ``` * DOCKER-USER 是 FORWARD 鏈的第一步,我們的異常檢測規則在這裡。 **4. 最終轉發** * 如果 DOCKER-USER 允許(RETURN),封包繼續到 DOCKER 鏈(nat 表),完成轉發 --- ## (3)以個別IP啟動各自的新容器 目的是為了方便後須的分析,一個容器只會紀錄該使用者的紀錄 ``` # 針對每個攻擊者 IP 啟動 Conpot 蜜罐 if [ -n "$ATTACKER_IPS" ]; then for IP in $ATTACKER_IPS; do CONPOT_NAME="conpot_$IP" CONPOT_NAME=$(echo "$CONPOT_NAME" | tr '.' '_') BACKUP_PATH="$BACKUP_DIR/$CONPOT_NAME" ``` * 新容器名稱以IP命名 ``` docker run --name "$CONPOT_NAME" \ -v /home/bbrain/conpot_logs/new_machine:/var/log/conpot \ -v "$BACKUP_PATH":/conpot/data \ -p "$PORT_5020:5020" \ -p "$PORT_161:161/udp" \ -p "$PORT_20000:20000" \ -d conpot_clean \ /home/conpot/.local/bin/conpot \ --template /home/conpot/.local/lib/python3.6/site-packages/conpot-0.6.0-py3.6.egg/conpot/templates/default/ \ --config /home/conpot/.local/lib/python3.6/site-packages/conpot-0.6.0-py3.6.egg/conpot/conpot.cfg ``` * 啟動新容器 * `-v /home/bbrain/conpot_logs/new_machine:/var/log/conpot`將新容器的log檔映射到ubuntu本機上,以利後須做日誌集中管理 --- # 日誌集中管理 我們原先採用較有名的ELK,但是實在太耗系統資源了,原是採用輕量級的**Loki + Promtail + Grafana** * Loki跟Grafana安裝在ubuntu(二)上 * Protail安裝在ubuntu(一)上 在ubuntu(一)中可以在`promtail-config.yaml`設定要推送到Loki的日誌 ``` - job_name: "Attacker_logs" static_configs: - targets: - localhost labels: job: "Attacker_log" host: "conpot_server" __path__: "/home/bbrain/conpot_logs/new_machine/conpot.json" ``` 設定好之後執行`./promtail-linux-amd64 -config.file=promtail-config.yaml &`就可以在ubuntu(二)的瀏覽器`http://<ubuntu(二)IP>:3100/ready`查看日誌 ## grafana mcp server **讓 grafana 與 mcp server 串起來更方便logs的分析** 1. 安裝go 2. 安裝 mcp-grafana 3. 安裝 MCP-Host 端 (claude desktop) 4. 取得grafana API KEY 5. 編輯 cluade desktop 設定檔 詳細內容看[這邊](https://hackmd.io/@BBrain/ryL-6Uk0kl) --- # Demo 1.[iptable偵測+流量移轉](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EVlNg0ygwddNpeZ_SCbiiDQBJH4yrvhByI0xOKOFt_IDkA?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=czdKJv) 2.[Gemini API 偵測+流量移轉](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EZu3ngp3UyVGlLKNstgYueEBzwcxT-q8gTndz8EA_IrAkg?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=hhb477) 3.[機器學習模型偵測](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EcGGsKGGxtVGs-y3i2uN4okBxutKd5D7x5oFm_xOUGbbOg?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=szkocg) 4.[在Claude上透過MCP分析事件](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EeOBRVrBPpxJqfzfQXvGmEUBLFeXOFUDkvZw-t0lZPWX6g?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=tT7pex) --- # Github 1. [專題程式碼](https://github.com/BBrain778/ICS_conpot) 1. [Conpot開源專案](https://github.com/mushorg/conpot) 2. [FUXA開源碼網頁式圖控平台](https://github.com/frangoteam/FUXA)