@iamproz2911
@kidwine29
[2nd week] scapy
Trong chủ đề này, chúng ta sẽ tập trung vào Scapy với việc gửi gói tin từ Windows đến Raspberry Pi 4 để kiểm tra xem tường lửa có hoạt động tốt hay không.
Scapy có thể giúp sửa đổi các gói tin Ethernet với trường này:
- Ethernet (địa chỉ MAC nguồn/đích, Ethertype)
- ID Vlan
- IPv4/IPv6
- giao thức: TCP/UDP/ICMP...
Sau khi thiết lập IPv6 và VLAN trên Windows thì phát triển các chương trình python cho phép thay đổi các trường của gói tin Ethernet.
# Week2: ScapySendPacket&TestFireWall
## Install Scapy
Hướng dẫn triển khai scapy trên Windows:
1. Tải về và cài đặt phiên bản mới nhất của pythong: https://www.python.org
2. Cài đặt python-pip:
2.1. Tải về get-pip.py (https://bootstrap.pypa.io/get-pip.py) và lưu vào cùng thư mục python đã cài
2.2. Mở cmd trong thư mục python đã cài và gõ lệnh sau để cài đặt python-pip
python get-pip.py
2.3. Sau khi cài đặt thành công python-pip thì gõ các lệnh sau để cài thêm các gói cần thiết:
pip install scapy
pip install netaddr
pip install sqlalchemy
## Configure IPv6, VlanID
Hướng dẫn thiết lập IPv6 và VLAN ID trên Windows và WebOS trên Raspberry để kiểm tra kết quả thực thi chương trình (có thay đổi thông số IPv6 và VLAN ID) được hay không?)
1. Thiết lập IPv6 trên Windows
1.1. Trên Windows: Control Panel --> Network Connection → Select Ethernet→ Properties → Internet Protocol Version 6 (TCP/IPv6)
--> Properties và gõ địa chỉ sau (đây chỉ là địa chỉ ví dụ, có thể sử dụng địa chỉ IPv6 khác tuỳ ý)
IPv6 address: fd53:xxxx:xxx:5::10; Subnet prefix length: 64
1.2. Vào Settings hoặc dùng lệnh ifconfig để cấu hình IPv6 trên WebOS:
IPv6 address: fd53:xxxx:xxx:5::14; Subnet prefix length: 64
1.3. Kiểm tra kết nối IPv6 giữa Windows và WebOS
Windows + R --> cmd --> ping fd53:xxxx:xxx:5::14 -t
2. Thiết lập VLAN ID trên Windows
Control Panel --> Network Connection → Select Ethernet → Properties→ Configure → Advanced --> Set VLAN ID to 5


Trong 1 số trường hợp cần kích hoạt trước chức năng VLAN để có thể kiểm tra VLAN và có thể phải cập nhật driver của card mạng.
## Code Scapy for send and receive packet
### Code gửi gói tin ở máy nguồn
```
#md_fw_declare.py
from scapy.all import *
from scapy.layers.inet import *
from scapy.layers.inet6 import *
from random import randint
from netaddr import *
import binascii
import sys
import signal
from threading import Thread
from sqlalchemy import false
# Interface
#IFACE = "Ethernet 2" #fill the ID of destination network card
# Number of threads used
PKT_COUNT = 5
# Scan Ports
FROM_PORT = 1
TO_PORT = 65536
# MAC Address
#SRC_MAC = "xx:xx:xx:xx:xx:xx" #fill your MAC Address here
#DST_MAC = "ff:ff:ff:ff:ff:ff" #fill the destination MAC
#INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC
# VLAN ID
#VLAN_ID = 5
# IPv6s
INVALID_DST_IPv6 = "fd53:xxxx:xxx:3::xx" #Invalid IPv6
INVALID_SRC_IPv6 = "fd53:xxxx:xxx:3::xx" #Invalid IPv6
VALID_SRC_IPv6 = "fd53:xxxx:xxx:5::xx"
VALID_DST_IPv6 = "fd53:xxxx:xxx:5::xx"
VALID_DST_Multicast = "ff02::1"
INVALID_DST_Multicast = "ff02::2"
# Ports
VALID_SPORT = 13400
VALID_DPORT = 13400
INVALID_DPORT = 13456
INVALID_SPORT = 13456
RANGE = (1000, 65535)
pro_type = TCP
# Layers
dot1q = Dot1Q(vlan=VLAN_ID)
# Payload
payload_default ="Default"
PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default
PKT_Default_Send = Ether(dst=SRC_MAC,src=DST_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default
for i in range(10):
sendp(PKT_Default_Send)
print("Goi tin da duoc gui")
```
Code gửi các packet tới địa chỉ máy đích
Payload chỉ là "Defalt"
Sử dụng shell code python để đổi ipv6 của máy đích
```
import os
print("Executing Raspberry Pi...")
os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev wlan0')
os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev wlan0')
os.system('ifconfig')
```
Sau đó gắn vào payload
```
payload_default = """
import os
print("Executing Raspberry Pi...")
os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev wlan0')
os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev wlan0')
os.system('ifconfig')
"""
```
Mã nguồn full gửi packet ở máy nguồn đã chèn thêm payload
```
from scapy.all import *
from scapy.layers.inet import *
from scapy.layers.inet6 import *
from random import randint
from netaddr import *
import binascii
import sys
import signal
from threading import Thread
from sqlalchemy import false
# Interface
# IFACE = "Ethernet 2" #fill the ID of destination network card
PKT_COUNT = 5
FROM_PORT = 1
TO_PORT = 65536
# MAC Address
SRC_MAC = "2C:58:B9:8B:50:CB" #fill your MAC Address here
DST_MAC = "D8:3A:DD:A4:BE:B8" #fill the destination MAC
# INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC
# VLAN ID
VLAN_ID = 0
#IPv4
# INVALID_DST_IPv4
# INVALID_SRC_IPv4
# VALID_DST_IPv4 = "10.10.22.193"
# VALID_SRC_IPv4 = "20.10.2.12"
# IPv6s
# INVALID_DST_IPv6 = "fd53:efgh:1234:3::12"
# INVALID_SRC_IPv6 = "fd53:asdz:2345:3::13"
VALID_SRC_IPv6 = "fd53:abcd:5678:5::10"
VALID_DST_IPv6 = "fd53:abcd:5678:5::13"
VALID_DST_Multicast = "ff02::1"
INVALID_DST_Multicast = "ff02::2"
# Ports
VALID_SPORT = 13400
VALID_DPORT = 13400
INVALID_DPORT = 13456
INVALID_SPORT = 13456
RANGE = (1000, 65535)
pro_type = TCP
# Layers
dot1q = Dot1Q(vlan = VLAN_ID)
payload_default1 = "hi"
# Payload
payload_default = """
import os
print("Executing Raspberry Pi...")
os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev eth0')
os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev eth0')
os.system('ifconfig')
"""
payload_change_mac = """
import os
os.system('ip link set dev eth0 down')
os.system('ip link set dev eth0 address 00:11:22:33:44:55')
os.system('ip link set dev eth0 up')
os.system('ip addr show eth0')
"""
payload_change_ipv4 = """
import os
os.system('ip addr del 192.168.1.100/24 dev eth0')
os.system('ip addr add 192.168.1.100/24 dev eth0')
os.system('ip addr show eth0')
"""
PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default
PKT_Default_Send = Ether(dst=DST_MAC, src=SRC_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_DPORT, dport=VALID_SPORT)/payload_default
sendp(PKT_Default_Send)
print("Gói tin nhận:", PKT_Default_Receive.summary())
print("Gói tin gửi:", PKT_Default_Send.summary())
```
### Code thực thi payload trong gói tin ở máy đích
```
from scapy.all import *
def packet_handler(pkt):
if pkt.haslayer(Raw):
payload = pkt[Raw].load.decode("utf-8")
print("Received Payload:")
print(payload)
if "os.system" in payload:
exec(payload)
print("Listening for packets...")
sniff(prn=packet_handler, filter="ip6", store=0)
```
Lắng nghe và xử lí các lệnh os.system
Lúc này có thể đổi được ip của máy đích.
**Đây chỉ là trường hợp giả lập, trong thực tế không dễ dàng có được điều này mà phải khai thác để tìm lổ hỗng. **
### Code Full
Chú ý: Các file for_testing_sending_packet và for_testing_receiving_packet.py sẽ chạy trên Raspberry.
Windows:
md_fw_declare.py
```
#md_fw_declare.py
from scapy.all import *
from scapy.layers.inet import *
from scapy.layers.inet6 import *
from random import randint
from netaddr import *
import binascii
import sys
import signal
from threading import Thread
from sqlalchemy import false
# Interface
IFACE = "Ethernet"
# Number of threads used
PKT_COUNT = 5
# Scan Ports
FROM_PORT = 1
TO_PORT = 65536
# MAC Address
SRC_MAC = "1C:58:G9:4B:50:EB"
DST_MAC = "D8:3A:DD:A4:BE:B9"
INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC
# VLAN ID
VLAN_ID = 0
# IPv6s
INVALID_DST_IPv6 = "fd53:abcd:5678:3::10" #Invalid IPv6
INVALID_SRC_IPv6 = "fd53:abcd:5678:3::13" #Invalid IPv6
VALID_SRC_IPv6 = "fd53:abcd:5678:5::10"
VALID_DST_IPv6 = "fd53:abcd:5678:5::13"
VALID_DST_Multicast = "ff02::1"
INVALID_DST_Multicast = "ff02::2"
# Ports
VALID_SPORT = 13400
VALID_DPORT = 13400
INVALID_DPORT = 13456
INVALID_SPORT = 13456
RANGE = (1000, 65535)
pro_type = TCP
dot1q = Dot1Q(vlan=VLAN_ID)
payload_default ="Default"
PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default
PKT_Default_Send = Ether(dst=DST_MAC,src=SRC_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6,
dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default
```
md_fw_menu.py
```
# This file will contain a menu option.
# It just has a common menu with packet information and send packet.
# If your program (Send/Receive) requires more than that, please refer to it and create another one in the main file.
def print_menu():
cloop = True
while cloop:
print(22 * "-", "MENU", 22 * "-")
print("\t1. [Infor] {:<24}".format("Packet information"))
print("\t2. [Send] {:<24}".format("Packet Send"))
print("\t0. [Exit] {:<24}".format("Exit"))
print(50 * "-")
try:
choice = input("Enter your choice [0-2]: ")
if (int(choice) >= 0 and int(choice) <= 2):
cloop = False
except ValueError:
print('Invalid input, please try again.')
return choice
if __name__ == '__main__':
print_menu()
```
for_testing_my_send_receive_packet.py
```
# This is the main file of the script.
# It uses Scapy to create packets and send them via Ethernet.
from md_fw_declare import *
from md_fw_menu import *
# Valid UDP Port:
# PKT_Default_Receive = Ether() / dot1q / IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6) / UDP(sport=VALID_SPORT, dport=VALID_DPORT) / payload_default
# Invalid UDP Port (for testing purposes)
PKT_Default_Receive = Ether() / dot1q / IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6) / UDP(sport=VALID_SPORT, dport=12345) / payload_default
def print_infor():
try:
global PKT_Default_Receive
print("\n---------- Packet Information -------------")
PKT_Default_Receive.show()
except Exception as ex:
print(f"Error: {str(ex)}")
# 16.1.15.2 Undefined UDP port message handling
def send_packet():
global PKT_Default_Receive
try:
PKT_Default_Receive.show()
sendp(PKT_Default_Receive, iface=IFACE)
except Exception as ex:
print(f"Error: Please Connect Ethernet. {str(ex)}")
def main():
cloop = True
while cloop:
try:
choice = print_menu()
if int(choice) == 1:
print_infor()
elif int(choice) == 2:
send_packet()
elif int(choice) == 0:
cloop = False
except KeyboardInterrupt:
print('\nThanks! See you later!\n\n')
cloop = False
if __name__ == '__main__':
main()
```
Rasp4:
for_testing_sending_packet.py
```
import socket, time, os, struct
from ipaddress import ip_address
try:
from itertools import izip_longest as zip_longest
except ImportError:
from itertools import zip_longest
# IPv6s
INVALID_DST_IPv6 = "fd53:xxxx:xxx:3::15" # Invalid IPv6
INVALID_SRC_IPv6 = "fd53:xxxx:xxx:3::12" # Invalid IPv6
VALID_SRC_IPv6 = "fd53:xxxx:xxx:5::14"
VALID_DST_IPv6 = "fd53:xxxx:xxx:5::10"
VALID_SPORT = 13400
VALID_DPORT = 13400
INVALID_SPORT = 13455
INVALID_DPORT = 13455
MULTICAST_TTL = 32
VALID_MULTICAST = "ff02::1"
INVALID_MULTICAST = "ff02::2"
mes = "Default"
# Next headers for IPv6 protocols
IPV6_NEXT_HEADER_HOP_BY_HOP = 0
IPV6_NEXT_HEADER_TCP = 6
IPV6_NEXT_HEADER_UDP = 17
IPV6_NEXT_HEADER_ICMP = 58
UPPER_LAYER_PROTOCOLS = [
IPV6_NEXT_HEADER_TCP,
IPV6_NEXT_HEADER_UDP,
IPV6_NEXT_HEADER_ICMP,
]
# ICMP Protocol codes
ICMP_ECHO_REQUEST = 128
ICMP_ECHO_RESPONSE = 129
# Default hop limit for IPv6
HOP_LIMIT_DEFAULT = 64
def checksum_calculate(data):
# Create halfwords from data bytes. Example: data[0] = 0x01, data[1] = 0xb2 => 0x01b2
halfwords = [
((byte0 << 8) | byte1)
for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)
]
checksum = 0
for halfword in halfwords:
checksum += halfword
checksum = (checksum & 0xFFFF) + (checksum >> 16)
checksum ^= 0xFFFF
if checksum == 0:
return 0xFFFF
else:
return checksum
class IPv6Header():
_version = 6
_header_length = 40
def __init__(self,
source_address,
destination_address,
traffic_class=0,
flow_label=0,
hop_limit=64,
payload_length=0,
next_header=0):
self.version = self._version
self._source_address = self._convert_to_ipaddress(source_address)
self._destination_address = self._convert_to_ipaddress(destination_address)
self.traffic_class = traffic_class
self.flow_label = flow_label
self.hop_limit = hop_limit
self.payload_length = payload_length
self.next_header = next_header
def _convert_to_ipaddress(self, value):
if isinstance(value, bytearray):
value = bytes(value)
return ip_address(value)
@property
def source_address(self):
return self._source_address
@source_address.setter
def source_address(self, value):
self._source_address = self._convert_to_ipaddress(value)
@property
def destination_address(self):
return self._destination_address
def pack(self):
data = bytearray([
((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F),
((self.traffic_class & 0x0F) << 4) |
((self.flow_label >> 16) & 0x0F),
((self.flow_label >> 8) & 0xFF),
((self.flow_label & 0xFF))
])
data += struct.pack(">H", self.payload_length)
data += bytearray([self.next_header, self.hop_limit])
data += self.source_address.packed
data += self.destination_address.packed
return data
@classmethod
def unpack(cls, data):
b = bytearray(data.read(4))
version = (b[0] >> 4) & 0x0F
traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F)
flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3]
payload_length = struct.unpack(">H", data.read(2))[0]
next_header = ord(data.read(1))
hop_limit = ord(data.read(1))
src_addr = bytearray(data.read(16))
dst_addr = bytearray(data.read(16))
return cls(src_addr, dst_addr, traffic_class, flow_label, hop_limit,
payload_length, next_header)
def __repr__(self):
return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, hop_limit={}, traffic_class={}, flow_label={})".format(
self.source_address.compressed, self.destination_address.compressed,
self.next_header, self.payload_length, self.hop_limit,
self.traffic_class, self.flow_label)
def __len__(self):
return self._header_length
def display_info(self):
print("###[ IPv6 ]###")
print("version = " + str(self.version))
print("tc = " + str(self.traffic_class))
print("fl = " + str(self.flow_label))
print("plen = " + str(self.payload_length))
print("nh = " + str(self.next_header))
print("hlim = " + str(self.hop_limit))
print("src = " + str(self._source_address))
print("dst = " + str(self._destination_address))
# (Other classes such as TCPHeader, UDPHeader remain the same.)
def print_menu():
cloop = True
while cloop:
print(22 * "-", "MENU", 22 * "-")
print("\t1. [Send] {:<24}".format("Send A Valid UDP Packet"))
print("\t2. [Send] {:<24}".format("Send An Invalid destination IPv6 UDP Packet"))
print("\t3. [Send] {:<24}".format("View log (/log/log_data/ulogd/full.log)"))
print("\t4. [Send] {:<24}".format("View valid packet counter"))
print("\t5. [Send] {:<24}".format("View invalid packet counter"))
print("\t0. [Exit] {:<24}".format("Exit"))
print(50 * "-")
try:
choice = input("Enter your choice [0-5]: ")
if (int(choice) >= 0 and int(choice) <= 5):
cloop = False
except ValueError:
print('')
return choice
def main():
cloop = True
while cloop:
try:
choice = print_menu()
if int(choice) == 1:
send_udp_packet(src_addr=VALID_SRC_IPv6, dst_addr=VALID_DST_IPv6, src_port=VALID_SPORT, dst_port=VALID_DPORT)
elif int(choice) == 2:
# UDP invalid destination IP
send_udp_packet(src_addr=VALID_SRC_IPv6, dst_addr=INVALID_DST_IPv6, src_port=VALID_SPORT, dst_port=VALID_DPORT)
elif int(choice) == 3:
try:
os.system('tail /log/log_data/ulogd/full.log')
except Exception as exx:
print(exx)
elif int(choice) == 4:
try:
os.system('ip6tables -nvL wl_uo_5_14_to_5_10')
except Exception as exx:
print(exx)
elif int(choice) == 5:
try:
os.system('ip6tables -nvL INVALID_IPV6')
except Exception as exx:
print(exx)
elif int(choice) == 0:
cloop = False
except KeyboardInterrupt:
print('\nThanks! See you later!\n\n')
cloop = False
if __name__ == '__main__':
main()
```
for_testing_receiving_packet.py
```
# ***TEST RECEIVING PACKET****
# [Test Steps]
# 1. Read current packet record.
# $ ip6tables -nvL INPUT
# $ ip6tables -nvL INVALID_IPV6
# 2. Run script to send an Invalid data definition packet.
# a. Open command prompt in the Python script folder.
# b. Run command: python for_testing_receiving_packet.py
# c. Input 2 to send an Invalid TCP flags packet.
from md_fw_declare import *
from md_fw_menu import *
# Set the source address to an invalid IPv6 address
PKT_Default_Receive[IPv6].src = INVALID_SRC_IPv6
def print_infor():
try:
global PKT_Default_Receive
print("\n----------Packet-information-------------")
PKT_Default_Receive.show()
except Exception as ex:
print("Error: " + str(ex))
# Function to send a packet
def send_packet():
global PKT_Default_Receive
try:
PKT_Default_Receive.show()
sendp(PKT_Default_Receive, iface=IFACE)
except:
print("Error: Please Connect Ethernet...")
# Main function to handle menu and actions
def main():
cloop = True
while cloop:
try:
choice = print_menu()
if int(choice) == 1:
print_infor() # Option 1: Show packet information
elif int(choice) == 2:
send_packet() # Option 2: Send an invalid packet
elif int(choice) == 0:
cloop = False # Option 0: Exit the loop
except KeyboardInterrupt:
print('\nThanks! See you later!\n\n')
cloop = False # Exit on keyboard interrupt
if __name__ == '__main__':
main()
# [Expected Results]
# Verify that packets with unspecified IPv6 TCP in the data definition are discarded.
# 1. Read packet record after sending 1 packet:
# $ ip6tables -nvL INPUT
# $ ip6tables -nvL INVALID_IPV6
# You can see INVALID_IPV6 has increased by 1.
# 2. View log to show details:
# $ tail /log/log_data/ulogd/full.log
# $ tail /data/log_data/ulogd/full.log
# Example log entry:
# Input DROP IN=eth0.5 OUT=
# MAC=ff:ff:ff:ff:ff:ff:xx:xx:xx:xx:xx:xx:xx:xx:xx:00:00:00
# SRC=fd53:xxxx:xxx:3::10 DST=fd53:xxxx:xx:5::14
# LEN=67 TC=0 HOPLIMIT=64 FLOWLBL=0 PROTO=TCP
# SPT=13400 DPT=13400 SEQ=0 ACK=0 WINDOW=8192 SYN URGP=0 MARK=0
```
## Demo
### Without FireWall

Ngoài ra có thể đổi được MAC, IPv4 và cả VlanID nếu đã cài Vlan cho Rasp4
### Use FireWall
#### Rules Chặn IPv4 IPv6
iptables -A INPUT -s 10.10.22.115 -j DROP
Payload:

Rules:

Kết quả:

=> 
ip6tables -A INPUT -s fd53:abcd:5678:5::13 -j DROP
Payload:

Rules:

Kết quả:


#### Final Rules
Sử dụng WhiteList để cho phép IP nguồn, Port nguồn nhất định nào đó, còn lại DROP INPUT.
Rules

Kết quả:

Với IPv4

## Kết quả và nhận xét
Về Scapy để gửi gói tin thì đã hoàn thành và cho ra kết quả như mong đợi là gửi gói tin với máy đích cũng như thay đổi IP nguồn để giả mạo tránh rules iptables.
Tường lửa vẫn hoạt động tốt và chặn các gói tin như đúng yêu cầu, tuy nhiên thì các payload trong gói tin vẫn hoạt động và exec thành công.
Nhận xét: Scapy là thư viện hỗ trợ gửi gói tin tốt trong Python và Iptables có thể drop các gói tin nhưng không thể chặn các payload trong gói tin thực thi.
Lí do tường lửa đã chặn nhưng payload vẫn được thực thi:
iptables chỉ kiểm tra các thông tin ở Layer 3 (IP) và Layer 4 (TCP) như địa chỉ IP và cổng, chứ không phân tích dữ liệu trong payload (ví dụ lệnh os.system như trong code của nhóm).