# 工業控制系統中惡意流量檢測與流量移轉
## 說明
### 情境
在真實的鐵路工控環境中設有機器學習異常流量檢測機制,當發現為惡意使用者時,會針對該使用者獨立開啟一個新的容器,攻擊者不會察覺自己的流量被移轉到非正式的環境,後續可以針對新產生的容器日誌進行惡意行為分析
### 環境
1.在 ubuntu (一)上以 docker 架設 conpot 容器,conpot 容器作為合法的鐵路環境
2.在同一台 ubuntu (一)機器上,若偵測到惡意使用者IP時,會動態產生以IP命名的新容器
3.另外一台 ubuntu (二)作為 grafana 日誌管理平台
## 架構圖


---
# 工控基本設備
| 項目 | PLC (可程式邏輯控制器) | SCADA (監控與數據採集系統) | HMI (人機介面) |
| -------- | ------------------------ | ----------------------- | --------------- |
| **主要功能** | 控制現場設備的運行 | 監控、收集來自 PLC 的數據、報警、數據分析 | 用戶與控制系統的交互界面 |
| **用途** | 自動化控制設備 | 數據監控、報告與管理系統 | 顯示設備狀態,接受用戶命令 |
| **系統層次** | 基層設備控制 | 高層次監控系統 | 用戶操作界面 |
* 本專題模擬三個 PLC 設備
- 信號燈
- 平交道控制器
- 道岔控制器
* 本專題 HMI 介面使用 FUXA
# 環境模擬
## Slave ID 與 Address差異
| 項目 | Slave ID | Address(資料位址) |
| ---- | -------------------- | ------------------------ |
| 定義 | 指定哪一台從站設備 | 指定該設備內的哪個資料欄位 |
| 功能層級 | 封包中的「設備位址」欄位 | 封包中的「資料欄位」 |
| 應用 | 跟哪個設備通訊 | 要讀寫設備中的哪一個 Coil/Register |
| 範圍 | 1~247(RTU),0 是廣播(選用) | 通常是 0~65535,依資料類型而定 |
## 模擬真實火車控制環境設定
在本專題中,為了模擬真實鐵路工控場域,在 Conpot 中針對 `modbus.xml` 進行自訂,設置了三大類常見的鐵道設備控制項目:
1. 火車信號燈控制 (Slave ID 3)
* 透過 Coils 區域模擬紅燈與綠燈的狀態切換。
* 紅燈(`trainSignalRedLight`)位於地址 1,綠燈(`trainSignalGreenLight`)位於地址 2。
2. 平交道控制器 (Slave ID 4)
* 以單一 Coil 控制平交道柵欄開關狀態。
* 對應地址為 3,變數名稱為 `railwayCrossingBarrier`。
3. 道岔控制器 (Slave ID 5)
* 模擬鐵軌轉轍器(道岔)的位置切換。
使用一個 Coil 在地址 4,變數名稱為 `switchTrackPosition`。
## 模擬數值變化設定
在 `template.xml` 中,為了使模擬環境更加貼近真實工控系統隨機變化的特性,我針對各項控制設備的狀態,配置了隨機變化的模板設定:
* 火車信號燈控制
- `trainSignalRedLight`、`trainSignalGreenLight`
- 狀態以 隨機生成 0 或 1 的方式模擬紅燈/綠燈的開關變化。
- 0 代表燈號關閉,1 代表燈號開啟。
* 平交道控制器
- `railwayCrossingBarrier`
- 隨機產生 0 或 1,模擬平交道柵欄的啟閉狀態。
- 0 表示開啟(柵欄升起),1 表示關閉(柵欄降下)。
* 道岔控制器
- `switchTrackPosition`
- 同樣以 0 或 1 的隨機數值模擬鐵軌轉向情境。
- 0 代表直行軌道,1 代表切換至側線。
### 補充說明
* 本設計遵循了工控常見的 Modbus 通訊協定,以 Coils (0x) 類型模擬開關型裝置(即只需判斷開/關兩種狀態)。
* 針對每一類設備,都以獨立的 Slave ID 模擬不同控制器,貼近實際 ICS (Industrial Control Systems) 環境中多元設備分散部署的特性。
* 後續攻擊者若針對特定 Modbus 地址範圍進行掃描或操作,即可誘發異常行為,進而被本系統檢測並隔離。
## mbtget 工具
mbtget 是Linux/Unix 系統上常見的一個工具,用於 Modbus 通訊協定中的命令行工具,用於從 Modbus 設備讀取數據。
以我們的環境為例子:
將紅燈打開
```
mbtget -u 3 -a 1 -w5 1 -p 5020 192.168.14.149
```
短時間快速傳送封包
```
for i in {1..10}; do mbtget -u 3 -a 1 -w5 1 -p 5020 192.168.14.149; sleep 0.5; done
```
* -u 3:目標 Slave ID。
* -a 1:Modbus 地址: 1(紅燈)/2 (綠燈)
* -w5 1:寫入值 1,表示開啟 / 寫入值 0,表示關閉。
* -p 5020:使用自訂埠號 5020。
---
# HMI介面
FUXA 是一個開源的 SCADA/HMI Web 應用程式,用來監控與控制工業自動化設備。它可以透過網頁介面建立與操作監控畫面(如儀表板、動畫、報警、數據紀錄等),支援多種通訊協定(如 Modbus、OPC UA、MQTT)。
示意圖:

| PLC名稱 | Slave ID | Starting Address (位址) |
| :------------------- | :------- | :-------------------- |
| PLC-1-traffic-light | 3 | 1、2 |
| PLC-2-level-crossing | 4 | 3 |
| PLC-2-turnout | 5 | 4 |
## 動態產生的FUXA與conpot連線
複製新 FUXA 的 image 檔案
```
docker commit thirsty_varahamihira new_fuxa
```
創立新的 network
```
docker network create --driver bridge isolation
```
確認成功創建
```
docker network ls
```
啟動新FUXA容器
```
docker run -d --name new_fuxa_container --network isolation -p 1882:1881 new_fuxa
```
---
# 異常偵測
## 創建 `attacker_ips` 名單
創建這個名單,將各種偵測方式判斷出的異常IP統一集中在此,後續可以讓腳本自動偵測此名單內的IP來自動啟動新容器
```bash
sudo ipset create attacker_ips hash:ip
```
- 這個 `attacker_ips` 集合會存放 **攻擊者的 IP**
查看 ipset 名單
```
sudo ipset list attacker_ips
```

## 三種異常偵測方法
### (1)iptables
iptables為Linux系統中的防火牆,可以藉此來達到異常偵測
範例:
```
sudo iptables -I DOCKER-USER 1 -p tcp --dport 5020 -m recent --set --name MODBUS_ATTACK
sudo iptables -I DOCKER-USER 2 -p tcp --dport 5020 -m recent --update --seconds 10 --hitcount 5 --name MODBUS_ATTACK -j SET --add-set attacker_ips src
sudo iptables -A DOCKER-USER -j RETURN # 確保其他流量繼續處理
```
1. 第一條規則
* 當有新的 TCP 連接嘗試訪問 5020 端口時,將來源 IP 記錄到 `MODBUS_ATTACK` 列表,並標記時間戳
2. 第二條規則
* 監控 `MODBUS_ATTACK` 列表中的 IP,如果某個 IP 在 60 秒內超過 5 次連接嘗試,就將其加入 `attacker_ips` 集合。
### (2)gemini API
將流量轉成文字格式後,交給 LLM(大語言模型)分析,根據語意判斷是否為攻擊、類型、來源 IP。
* 前置作業
- gemini API KEY
- 安裝 google-generativeai `pip install google-generativeai`
1. 錄製流量
```
def capture_traffic(interface, duration, output_pcap):
"""使用 Wireshark 監控網路流量並儲存為 pcap 檔案。"""
try:
subprocess.run(["tshark", "-i", interface, "-a", f"duration:{duration}", "-w", output_pcap], check=True)
print(f"流量監控完成,檔案儲存於:{output_pcap}")
return True
except subprocess.CalledProcessError as e:
print(f"流量監控失敗:{e}")
return False
```
2. 轉換.pcap檔成.txt
```
def pcap_to_text(pcap_file, text_file):
"""將 pcap 檔案轉換為純文字檔案。"""
try:
with open(text_file, 'w') as f:
subprocess.run([
"tshark", "-r", pcap_file, "-T", "fields",
"-e", "frame.number",
"-e", "frame.time",
"-e", "ip.src",
"-e", "ip.dst",
"-e", "frame.protocols",
"-e", "frame.len",
"-e", "tcp.srcport",
"-e", "tcp.dstport",
"-e", "tcp.flags",
"-e", "udp.srcport",
"-e", "udp.dstport"
], stdout=f, check=True)
print(f"pcap 檔案轉換完成,檔案儲存於:{text_file}")
return True
except subprocess.CalledProcessError as e:
print(f"pcap 檔案轉換失敗:{e}")
return False
```
- 說明
::: spoiler
* frame.number:封包編號。
* frame.time:封包時間。
* ip.src:來源 IP 位址。
* ip.dst:目的 IP 位址。
* frame.protocols:封包協定。
* frame.len:封包長度。
* tcp.srcport:TCP 來源埠號。
* tcp.dstport:TCP 目的埠號。
* tcp.flags:TCP 旗標。
* udp.srcport:UDP 來源埠號。
* udp.dstport:UDP 目的埠號。
:::
3. 選擇模型
* 可以參考[這邊](**https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models**)
4. 輸入prompt
個人認為`只需要回答我這三個問題,不需要回答其他解釋`很重要,否則他會跟你阿哩阿扎說很多解釋,盡可能標準化輸出結果
```
prompt = f"""
根據以下的輸出,回答我以下3個問題,只需要回答我這三個問題,不需要回答其他解釋
1.是否為攻擊行為(ping請判定為攻擊行為)?(回答是/否)
2.是哪一種攻擊(若1為否則回答無攻擊)
3.攻擊者IP
{text_data}
"""
```
### (3)Machine Learning
自行訓練的模型,用於分析流量模式(例如特徵向量、時間序列)判斷異常。
* 透過滑動時間窗口(近 30 秒內)收集封包。
* 把封包特徵(像protocol、function code)轉成一組文字 token。
* 用事先訓練好的 Word2Vec 模型把這組 token 轉成向量。
* 計算這組向量跟「正常行為資料庫」的相似度。
* 如果相似度低於某個閾值(0.93),就判定為異常,並且把該來源 IP 加到 ipset 黑名單。

## 三個檢測方法之優先順序
1. **iptables** : **最快速、最低誤判率**:很適合當第一層防禦,只要超過連線次數就馬上抓,不需要分析、延遲極低。作為初步擋住明顯的暴力攻擊。
2. **機器學習模型** : **適中速度,有泛化能力**:可以偵測到比較隱藏的異常模式(像是慢速掃描、微量滲透),但要注意模型誤判(False Positive/False Negative)問題。
3. **Gemini API** : **精緻但延遲高**:語言模型的分析比較細膩,可以辨識複雜攻擊行為,但是轉檔、丟API、回應有延遲,不適合第一時間阻擋。比較適合做「**第二層確認**」或「**事件深度分析**」。
**多層次防禦流程**
1. 先用 iptables 快速擋住 70% 的明顯攻擊。
1. 剩下進來的流量,再丟給 **機器學習模型** 判別。
1. 如果機器學習模型標記為可疑,平行開啟 Gemini分析做進一步判讀、分類、紀錄證據。
---
# 流量移轉
## (1)新容器與合法環境的一致性
為了讓攻擊無法察覺自己的流量被移轉,必須產生一個與合法環境一模一樣的環境
**透過 docker commit,將合法環境保存為新的映像 conpot_clean**
```
docker commit legit_conpot conpot_clean
```
* `legal_conpot`為合法環境
* `conpot_clearn`為新的映像檔
---
## (2)流量移轉--iptables
透過iptables來寫規則,將異常的IP記錄在先前創立的`attacker_ips`
```
sudo iptables -I DOCKER-USER 1 -p tcp --dport 5020 -m recent --set --name MODBUS_ATTACK
sudo iptables -I DOCKER-USER 2 -p tcp --dport 5020 -m recent --update --seconds 10 --hitcount 5 --name MODBUS_ATTACK -j SET --add-set attacker_ips src
sudo iptables -A DOCKER-USER -j RETURN
```
* 規則一:當有新的 TCP 連接嘗試訪問 5020 端口時,將來源 IP 記錄到 `MODBUS_ATTACK` 列表,並標記時間戳
* 規則二:監控 `MODBUS_ATTACK` 列表中的 IP,如果某個 IP 在 60 秒內超過 5 次連接嘗試,就將其加入 `attacker_ips` 集合。
### 封包處理流程
查看iptables規則
```
sudo iptables -t nat -L -n -v
```
當封包從外部(例如 Kali 的 192.168.14.139)發送到你的主機(192.168.14.149:5020)時,經過以下階段:
**1. 進入系統**
* 封包首先到達 PREROUTING 鏈(nat 表)。
* 在這裡,Docker 的 NAT 規則(DOCKER 鏈)會檢查是否需要將封包轉換為容器地址(例如 172.17.0.2:5020)。
* 例如:
```
DNAT tcp -- !docker0 * 0.0.0.0/0 0.0.0.0/0 tcp dpt:5020 to:172.17.0.2:5020
```
**2. 路由決定**
* 系統決定封包是發往本機(INPUT)還是轉發(FORWARD)。
* 由於 Docker 使用 NAT,封包的目的地址被改為 172.17.0.2:5020,路由會選擇 FORWARD
**3. 轉發階段**
* 封包進入 FORWARD 鏈(filter 表)
* 在這裡,Docker 設置了規則:
```
DOCKER-USER 0 -- * * 0.0.0.0/0 0.0.0.0/0
```
* DOCKER-USER 是 FORWARD 鏈的第一步,我們的異常檢測規則在這裡。
**4. 最終轉發**
* 如果 DOCKER-USER 允許(RETURN),封包繼續到 DOCKER 鏈(nat 表),完成轉發
---
## (3)以個別IP啟動各自的新容器
目的是為了方便後須的分析,一個容器只會紀錄該使用者的紀錄
```
# 針對每個攻擊者 IP 啟動 Conpot 蜜罐
if [ -n "$ATTACKER_IPS" ]; then
for IP in $ATTACKER_IPS; do
CONPOT_NAME="conpot_$IP"
CONPOT_NAME=$(echo "$CONPOT_NAME" | tr '.' '_')
BACKUP_PATH="$BACKUP_DIR/$CONPOT_NAME"
```
* 新容器名稱以IP命名
```
docker run --name "$CONPOT_NAME" \
-v /home/bbrain/conpot_logs/new_machine:/var/log/conpot \
-v "$BACKUP_PATH":/conpot/data \
-p "$PORT_5020:5020" \
-p "$PORT_161:161/udp" \
-p "$PORT_20000:20000" \
-d conpot_clean \
/home/conpot/.local/bin/conpot \
--template /home/conpot/.local/lib/python3.6/site-packages/conpot-0.6.0-py3.6.egg/conpot/templates/default/ \
--config /home/conpot/.local/lib/python3.6/site-packages/conpot-0.6.0-py3.6.egg/conpot/conpot.cfg
```
* 啟動新容器
* `-v /home/bbrain/conpot_logs/new_machine:/var/log/conpot`將新容器的log檔映射到ubuntu本機上,以利後須做日誌集中管理
---
# 日誌集中管理
我們原先採用較有名的ELK,但是實在太耗系統資源了,原是採用輕量級的**Loki + Promtail + Grafana**
* Loki跟Grafana安裝在ubuntu(二)上
* Protail安裝在ubuntu(一)上
在ubuntu(一)中可以在`promtail-config.yaml`設定要推送到Loki的日誌
```
- job_name: "Attacker_logs"
static_configs:
- targets:
- localhost
labels:
job: "Attacker_log"
host: "conpot_server"
__path__: "/home/bbrain/conpot_logs/new_machine/conpot.json"
```
設定好之後執行`./promtail-linux-amd64 -config.file=promtail-config.yaml &`就可以在ubuntu(二)的瀏覽器`http://<ubuntu(二)IP>:3100/ready`查看日誌
## grafana mcp server
**讓 grafana 與 mcp server 串起來更方便logs的分析**
1. 安裝go
2. 安裝 mcp-grafana
3. 安裝 MCP-Host 端 (claude desktop)
4. 取得grafana API KEY
5. 編輯 cluade desktop 設定檔
詳細內容看[這邊](https://hackmd.io/@BBrain/ryL-6Uk0kl)
---
# Demo
1.[iptable偵測+流量移轉](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EVlNg0ygwddNpeZ_SCbiiDQBJH4yrvhByI0xOKOFt_IDkA?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=czdKJv)
2.[Gemini API 偵測+流量移轉](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EZu3ngp3UyVGlLKNstgYueEBzwcxT-q8gTndz8EA_IrAkg?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=hhb477)
3.[機器學習模型偵測](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EcGGsKGGxtVGs-y3i2uN4okBxutKd5D7x5oFm_xOUGbbOg?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=szkocg)
4.[在Claude上透過MCP分析事件](https://fjuedu-my.sharepoint.com/:v:/g/personal/411580142_m365_fju_edu_tw/EeOBRVrBPpxJqfzfQXvGmEUBLFeXOFUDkvZw-t0lZPWX6g?nav=eyJyZWZlcnJhbEluZm8iOnsicmVmZXJyYWxBcHAiOiJPbmVEcml2ZUZvckJ1c2luZXNzIiwicmVmZXJyYWxBcHBQbGF0Zm9ybSI6IldlYiIsInJlZmVycmFsTW9kZSI6InZpZXciLCJyZWZlcnJhbFZpZXciOiJNeUZpbGVzTGlua0NvcHkifX0&e=tT7pex)
---
# Github
1. [專題程式碼](https://github.com/BBrain778/ICS_conpot)
1. [Conpot開源專案](https://github.com/mushorg/conpot)
2. [FUXA開源碼網頁式圖控平台](https://github.com/frangoteam/FUXA)