@iamproz2911 @kidwine29 [2nd week] scapy Trong chủ đề này, chúng ta sẽ tập trung vào Scapy với việc gửi gói tin từ Windows đến Raspberry Pi 4 để kiểm tra xem tường lửa có hoạt động tốt hay không. Scapy có thể giúp sửa đổi các gói tin Ethernet với trường này: - Ethernet (địa chỉ MAC nguồn/đích, Ethertype) - ID Vlan - IPv4/IPv6 - giao thức: TCP/UDP/ICMP... Sau khi thiết lập IPv6 và VLAN trên Windows thì phát triển các chương trình python cho phép thay đổi các trường của gói tin Ethernet. # Week2: ScapySendPacket&TestFireWall ## Install Scapy Hướng dẫn triển khai scapy trên Windows: 1. Tải về và cài đặt phiên bản mới nhất của pythong: https://www.python.org 2. Cài đặt python-pip: 2.1. Tải về get-pip.py (https://bootstrap.pypa.io/get-pip.py) và lưu vào cùng thư mục python đã cài 2.2. Mở cmd trong thư mục python đã cài và gõ lệnh sau để cài đặt python-pip python get-pip.py 2.3. Sau khi cài đặt thành công python-pip thì gõ các lệnh sau để cài thêm các gói cần thiết: pip install scapy pip install netaddr pip install sqlalchemy ## Configure IPv6, VlanID Hướng dẫn thiết lập IPv6 và VLAN ID trên Windows và WebOS trên Raspberry để kiểm tra kết quả thực thi chương trình (có thay đổi thông số IPv6 và VLAN ID) được hay không?) 1. Thiết lập IPv6 trên Windows 1.1. Trên Windows: Control Panel --> Network Connection → Select Ethernet→ Properties → Internet Protocol Version 6 (TCP/IPv6) --> Properties và gõ địa chỉ sau (đây chỉ là địa chỉ ví dụ, có thể sử dụng địa chỉ IPv6 khác tuỳ ý) IPv6 address: fd53:xxxx:xxx:5::10; Subnet prefix length: 64 1.2. Vào Settings hoặc dùng lệnh ifconfig để cấu hình IPv6 trên WebOS: IPv6 address: fd53:xxxx:xxx:5::14; Subnet prefix length: 64 1.3. Kiểm tra kết nối IPv6 giữa Windows và WebOS Windows + R --> cmd --> ping fd53:xxxx:xxx:5::14 -t 2. Thiết lập VLAN ID trên Windows Control Panel --> Network Connection → Select Ethernet → Properties→ Configure → Advanced --> Set VLAN ID to 5 ![image](https://hackmd.io/_uploads/rkgPMVVVyx.png) ![image](https://hackmd.io/_uploads/rkrDf4NNJl.png) Trong 1 số trường hợp cần kích hoạt trước chức năng VLAN để có thể kiểm tra VLAN và có thể phải cập nhật driver của card mạng. ## Code Scapy for send and receive packet ### Code gửi gói tin ở máy nguồn ``` #md_fw_declare.py from scapy.all import * from scapy.layers.inet import * from scapy.layers.inet6 import * from random import randint from netaddr import * import binascii import sys import signal from threading import Thread from sqlalchemy import false # Interface #IFACE = "Ethernet 2" #fill the ID of destination network card # Number of threads used PKT_COUNT = 5 # Scan Ports FROM_PORT = 1 TO_PORT = 65536 # MAC Address #SRC_MAC = "xx:xx:xx:xx:xx:xx" #fill your MAC Address here #DST_MAC = "ff:ff:ff:ff:ff:ff" #fill the destination MAC #INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC # VLAN ID #VLAN_ID = 5 # IPv6s INVALID_DST_IPv6 = "fd53:xxxx:xxx:3::xx" #Invalid IPv6 INVALID_SRC_IPv6 = "fd53:xxxx:xxx:3::xx" #Invalid IPv6 VALID_SRC_IPv6 = "fd53:xxxx:xxx:5::xx" VALID_DST_IPv6 = "fd53:xxxx:xxx:5::xx" VALID_DST_Multicast = "ff02::1" INVALID_DST_Multicast = "ff02::2" # Ports VALID_SPORT = 13400 VALID_DPORT = 13400 INVALID_DPORT = 13456 INVALID_SPORT = 13456 RANGE = (1000, 65535) pro_type = TCP # Layers dot1q = Dot1Q(vlan=VLAN_ID) # Payload payload_default ="Default" PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default PKT_Default_Send = Ether(dst=SRC_MAC,src=DST_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default for i in range(10): sendp(PKT_Default_Send) print("Goi tin da duoc gui") ``` Code gửi các packet tới địa chỉ máy đích Payload chỉ là "Defalt" Sử dụng shell code python để đổi ipv6 của máy đích ``` import os print("Executing Raspberry Pi...") os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev wlan0') os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev wlan0') os.system('ifconfig') ``` Sau đó gắn vào payload ``` payload_default = """ import os print("Executing Raspberry Pi...") os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev wlan0') os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev wlan0') os.system('ifconfig') """ ``` Mã nguồn full gửi packet ở máy nguồn đã chèn thêm payload ``` from scapy.all import * from scapy.layers.inet import * from scapy.layers.inet6 import * from random import randint from netaddr import * import binascii import sys import signal from threading import Thread from sqlalchemy import false # Interface # IFACE = "Ethernet 2" #fill the ID of destination network card PKT_COUNT = 5 FROM_PORT = 1 TO_PORT = 65536 # MAC Address SRC_MAC = "2C:58:B9:8B:50:CB" #fill your MAC Address here DST_MAC = "D8:3A:DD:A4:BE:B8" #fill the destination MAC # INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC # VLAN ID VLAN_ID = 0 #IPv4 # INVALID_DST_IPv4 # INVALID_SRC_IPv4 # VALID_DST_IPv4 = "10.10.22.193" # VALID_SRC_IPv4 = "20.10.2.12" # IPv6s # INVALID_DST_IPv6 = "fd53:efgh:1234:3::12" # INVALID_SRC_IPv6 = "fd53:asdz:2345:3::13" VALID_SRC_IPv6 = "fd53:abcd:5678:5::10" VALID_DST_IPv6 = "fd53:abcd:5678:5::13" VALID_DST_Multicast = "ff02::1" INVALID_DST_Multicast = "ff02::2" # Ports VALID_SPORT = 13400 VALID_DPORT = 13400 INVALID_DPORT = 13456 INVALID_SPORT = 13456 RANGE = (1000, 65535) pro_type = TCP # Layers dot1q = Dot1Q(vlan = VLAN_ID) payload_default1 = "hi" # Payload payload_default = """ import os print("Executing Raspberry Pi...") os.system('ip -6 addr del fd53:abcd:5678:5::14/64 dev eth0') os.system('ip -6 addr add fd53:abcd:5678:5::11/64 dev eth0') os.system('ifconfig') """ payload_change_mac = """ import os os.system('ip link set dev eth0 down') os.system('ip link set dev eth0 address 00:11:22:33:44:55') os.system('ip link set dev eth0 up') os.system('ip addr show eth0') """ payload_change_ipv4 = """ import os os.system('ip addr del 192.168.1.100/24 dev eth0') os.system('ip addr add 192.168.1.100/24 dev eth0') os.system('ip addr show eth0') """ PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default PKT_Default_Send = Ether(dst=DST_MAC, src=SRC_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_DPORT, dport=VALID_SPORT)/payload_default sendp(PKT_Default_Send) print("Gói tin nhận:", PKT_Default_Receive.summary()) print("Gói tin gửi:", PKT_Default_Send.summary()) ``` ### Code thực thi payload trong gói tin ở máy đích ``` from scapy.all import * def packet_handler(pkt): if pkt.haslayer(Raw): payload = pkt[Raw].load.decode("utf-8") print("Received Payload:") print(payload) if "os.system" in payload: exec(payload) print("Listening for packets...") sniff(prn=packet_handler, filter="ip6", store=0) ``` Lắng nghe và xử lí các lệnh os.system Lúc này có thể đổi được ip của máy đích. **Đây chỉ là trường hợp giả lập, trong thực tế không dễ dàng có được điều này mà phải khai thác để tìm lổ hỗng. ** ### Code Full Chú ý: Các file for_testing_sending_packet và for_testing_receiving_packet.py sẽ chạy trên Raspberry. Windows: md_fw_declare.py ``` #md_fw_declare.py from scapy.all import * from scapy.layers.inet import * from scapy.layers.inet6 import * from random import randint from netaddr import * import binascii import sys import signal from threading import Thread from sqlalchemy import false # Interface IFACE = "Ethernet" # Number of threads used PKT_COUNT = 5 # Scan Ports FROM_PORT = 1 TO_PORT = 65536 # MAC Address SRC_MAC = "1C:58:G9:4B:50:EB" DST_MAC = "D8:3A:DD:A4:BE:B9" INVALID_SRC_MAC = "fa:fb:fc:fd:fe:ff" #Invalid MAC # VLAN ID VLAN_ID = 0 # IPv6s INVALID_DST_IPv6 = "fd53:abcd:5678:3::10" #Invalid IPv6 INVALID_SRC_IPv6 = "fd53:abcd:5678:3::13" #Invalid IPv6 VALID_SRC_IPv6 = "fd53:abcd:5678:5::10" VALID_DST_IPv6 = "fd53:abcd:5678:5::13" VALID_DST_Multicast = "ff02::1" INVALID_DST_Multicast = "ff02::2" # Ports VALID_SPORT = 13400 VALID_DPORT = 13400 INVALID_DPORT = 13456 INVALID_SPORT = 13456 RANGE = (1000, 65535) pro_type = TCP dot1q = Dot1Q(vlan=VLAN_ID) payload_default ="Default" PKT_Default_Receive = Ether()/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default PKT_Default_Send = Ether(dst=DST_MAC,src=SRC_MAC)/dot1q/IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6)/pro_type(sport=VALID_SPORT, dport=VALID_DPORT)/payload_default ``` md_fw_menu.py ``` # This file will contain a menu option. # It just has a common menu with packet information and send packet. # If your program (Send/Receive) requires more than that, please refer to it and create another one in the main file. def print_menu(): cloop = True while cloop: print(22 * "-", "MENU", 22 * "-") print("\t1. [Infor] {:<24}".format("Packet information")) print("\t2. [Send] {:<24}".format("Packet Send")) print("\t0. [Exit] {:<24}".format("Exit")) print(50 * "-") try: choice = input("Enter your choice [0-2]: ") if (int(choice) >= 0 and int(choice) <= 2): cloop = False except ValueError: print('Invalid input, please try again.') return choice if __name__ == '__main__': print_menu() ``` for_testing_my_send_receive_packet.py ``` # This is the main file of the script. # It uses Scapy to create packets and send them via Ethernet. from md_fw_declare import * from md_fw_menu import * # Valid UDP Port: # PKT_Default_Receive = Ether() / dot1q / IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6) / UDP(sport=VALID_SPORT, dport=VALID_DPORT) / payload_default # Invalid UDP Port (for testing purposes) PKT_Default_Receive = Ether() / dot1q / IPv6(src=VALID_SRC_IPv6, dst=VALID_DST_IPv6) / UDP(sport=VALID_SPORT, dport=12345) / payload_default def print_infor(): try: global PKT_Default_Receive print("\n---------- Packet Information -------------") PKT_Default_Receive.show() except Exception as ex: print(f"Error: {str(ex)}") # 16.1.15.2 Undefined UDP port message handling def send_packet(): global PKT_Default_Receive try: PKT_Default_Receive.show() sendp(PKT_Default_Receive, iface=IFACE) except Exception as ex: print(f"Error: Please Connect Ethernet. {str(ex)}") def main(): cloop = True while cloop: try: choice = print_menu() if int(choice) == 1: print_infor() elif int(choice) == 2: send_packet() elif int(choice) == 0: cloop = False except KeyboardInterrupt: print('\nThanks! See you later!\n\n') cloop = False if __name__ == '__main__': main() ``` Rasp4: for_testing_sending_packet.py ``` import socket, time, os, struct from ipaddress import ip_address try: from itertools import izip_longest as zip_longest except ImportError: from itertools import zip_longest # IPv6s INVALID_DST_IPv6 = "fd53:xxxx:xxx:3::15" # Invalid IPv6 INVALID_SRC_IPv6 = "fd53:xxxx:xxx:3::12" # Invalid IPv6 VALID_SRC_IPv6 = "fd53:xxxx:xxx:5::14" VALID_DST_IPv6 = "fd53:xxxx:xxx:5::10" VALID_SPORT = 13400 VALID_DPORT = 13400 INVALID_SPORT = 13455 INVALID_DPORT = 13455 MULTICAST_TTL = 32 VALID_MULTICAST = "ff02::1" INVALID_MULTICAST = "ff02::2" mes = "Default" # Next headers for IPv6 protocols IPV6_NEXT_HEADER_HOP_BY_HOP = 0 IPV6_NEXT_HEADER_TCP = 6 IPV6_NEXT_HEADER_UDP = 17 IPV6_NEXT_HEADER_ICMP = 58 UPPER_LAYER_PROTOCOLS = [ IPV6_NEXT_HEADER_TCP, IPV6_NEXT_HEADER_UDP, IPV6_NEXT_HEADER_ICMP, ] # ICMP Protocol codes ICMP_ECHO_REQUEST = 128 ICMP_ECHO_RESPONSE = 129 # Default hop limit for IPv6 HOP_LIMIT_DEFAULT = 64 def checksum_calculate(data): # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] = 0xb2 => 0x01b2 halfwords = [ ((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00) ] checksum = 0 for halfword in halfwords: checksum += halfword checksum = (checksum & 0xFFFF) + (checksum >> 16) checksum ^= 0xFFFF if checksum == 0: return 0xFFFF else: return checksum class IPv6Header(): _version = 6 _header_length = 40 def __init__(self, source_address, destination_address, traffic_class=0, flow_label=0, hop_limit=64, payload_length=0, next_header=0): self.version = self._version self._source_address = self._convert_to_ipaddress(source_address) self._destination_address = self._convert_to_ipaddress(destination_address) self.traffic_class = traffic_class self.flow_label = flow_label self.hop_limit = hop_limit self.payload_length = payload_length self.next_header = next_header def _convert_to_ipaddress(self, value): if isinstance(value, bytearray): value = bytes(value) return ip_address(value) @property def source_address(self): return self._source_address @source_address.setter def source_address(self, value): self._source_address = self._convert_to_ipaddress(value) @property def destination_address(self): return self._destination_address def pack(self): data = bytearray([ ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F), ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F), ((self.flow_label >> 8) & 0xFF), ((self.flow_label & 0xFF)) ]) data += struct.pack(">H", self.payload_length) data += bytearray([self.next_header, self.hop_limit]) data += self.source_address.packed data += self.destination_address.packed return data @classmethod def unpack(cls, data): b = bytearray(data.read(4)) version = (b[0] >> 4) & 0x0F traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F) flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3] payload_length = struct.unpack(">H", data.read(2))[0] next_header = ord(data.read(1)) hop_limit = ord(data.read(1)) src_addr = bytearray(data.read(16)) dst_addr = bytearray(data.read(16)) return cls(src_addr, dst_addr, traffic_class, flow_label, hop_limit, payload_length, next_header) def __repr__(self): return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, hop_limit={}, traffic_class={}, flow_label={})".format( self.source_address.compressed, self.destination_address.compressed, self.next_header, self.payload_length, self.hop_limit, self.traffic_class, self.flow_label) def __len__(self): return self._header_length def display_info(self): print("###[ IPv6 ]###") print("version = " + str(self.version)) print("tc = " + str(self.traffic_class)) print("fl = " + str(self.flow_label)) print("plen = " + str(self.payload_length)) print("nh = " + str(self.next_header)) print("hlim = " + str(self.hop_limit)) print("src = " + str(self._source_address)) print("dst = " + str(self._destination_address)) # (Other classes such as TCPHeader, UDPHeader remain the same.) def print_menu(): cloop = True while cloop: print(22 * "-", "MENU", 22 * "-") print("\t1. [Send] {:<24}".format("Send A Valid UDP Packet")) print("\t2. [Send] {:<24}".format("Send An Invalid destination IPv6 UDP Packet")) print("\t3. [Send] {:<24}".format("View log (/log/log_data/ulogd/full.log)")) print("\t4. [Send] {:<24}".format("View valid packet counter")) print("\t5. [Send] {:<24}".format("View invalid packet counter")) print("\t0. [Exit] {:<24}".format("Exit")) print(50 * "-") try: choice = input("Enter your choice [0-5]: ") if (int(choice) >= 0 and int(choice) <= 5): cloop = False except ValueError: print('') return choice def main(): cloop = True while cloop: try: choice = print_menu() if int(choice) == 1: send_udp_packet(src_addr=VALID_SRC_IPv6, dst_addr=VALID_DST_IPv6, src_port=VALID_SPORT, dst_port=VALID_DPORT) elif int(choice) == 2: # UDP invalid destination IP send_udp_packet(src_addr=VALID_SRC_IPv6, dst_addr=INVALID_DST_IPv6, src_port=VALID_SPORT, dst_port=VALID_DPORT) elif int(choice) == 3: try: os.system('tail /log/log_data/ulogd/full.log') except Exception as exx: print(exx) elif int(choice) == 4: try: os.system('ip6tables -nvL wl_uo_5_14_to_5_10') except Exception as exx: print(exx) elif int(choice) == 5: try: os.system('ip6tables -nvL INVALID_IPV6') except Exception as exx: print(exx) elif int(choice) == 0: cloop = False except KeyboardInterrupt: print('\nThanks! See you later!\n\n') cloop = False if __name__ == '__main__': main() ``` for_testing_receiving_packet.py ``` # ***TEST RECEIVING PACKET**** # [Test Steps] # 1. Read current packet record. # $ ip6tables -nvL INPUT # $ ip6tables -nvL INVALID_IPV6 # 2. Run script to send an Invalid data definition packet. # a. Open command prompt in the Python script folder. # b. Run command: python for_testing_receiving_packet.py # c. Input 2 to send an Invalid TCP flags packet. from md_fw_declare import * from md_fw_menu import * # Set the source address to an invalid IPv6 address PKT_Default_Receive[IPv6].src = INVALID_SRC_IPv6 def print_infor(): try: global PKT_Default_Receive print("\n----------Packet-information-------------") PKT_Default_Receive.show() except Exception as ex: print("Error: " + str(ex)) # Function to send a packet def send_packet(): global PKT_Default_Receive try: PKT_Default_Receive.show() sendp(PKT_Default_Receive, iface=IFACE) except: print("Error: Please Connect Ethernet...") # Main function to handle menu and actions def main(): cloop = True while cloop: try: choice = print_menu() if int(choice) == 1: print_infor() # Option 1: Show packet information elif int(choice) == 2: send_packet() # Option 2: Send an invalid packet elif int(choice) == 0: cloop = False # Option 0: Exit the loop except KeyboardInterrupt: print('\nThanks! See you later!\n\n') cloop = False # Exit on keyboard interrupt if __name__ == '__main__': main() # [Expected Results] # Verify that packets with unspecified IPv6 TCP in the data definition are discarded. # 1. Read packet record after sending 1 packet: # $ ip6tables -nvL INPUT # $ ip6tables -nvL INVALID_IPV6 # You can see INVALID_IPV6 has increased by 1. # 2. View log to show details: # $ tail /log/log_data/ulogd/full.log # $ tail /data/log_data/ulogd/full.log # Example log entry: # Input DROP IN=eth0.5 OUT= # MAC=ff:ff:ff:ff:ff:ff:xx:xx:xx:xx:xx:xx:xx:xx:xx:00:00:00 # SRC=fd53:xxxx:xxx:3::10 DST=fd53:xxxx:xx:5::14 # LEN=67 TC=0 HOPLIMIT=64 FLOWLBL=0 PROTO=TCP # SPT=13400 DPT=13400 SEQ=0 ACK=0 WINDOW=8192 SYN URGP=0 MARK=0 ``` ## Demo ### Without FireWall ![image](https://hackmd.io/_uploads/SJ9mVNN4yl.png) Ngoài ra có thể đổi được MAC, IPv4 và cả VlanID nếu đã cài Vlan cho Rasp4 ### Use FireWall #### Rules Chặn IPv4 IPv6 iptables -A INPUT -s 10.10.22.115 -j DROP Payload: ![image](https://hackmd.io/_uploads/H1LvOwrEJe.png) Rules: ![image](https://hackmd.io/_uploads/rkRPODBVye.png) Kết quả: ![image](https://hackmd.io/_uploads/Byd__PH41l.png) => ![image](https://hackmd.io/_uploads/SkeYdvHEkg.png) ip6tables -A INPUT -s fd53:abcd:5678:5::13 -j DROP Payload: ![image](https://hackmd.io/_uploads/HJB9Owr4ke.png) Rules: ![image](https://hackmd.io/_uploads/HJTcuDBVkg.png) Kết quả: ![image](https://hackmd.io/_uploads/B1OsdvHVJx.png) ![image](https://hackmd.io/_uploads/SkM3dPSEyg.png) #### Final Rules Sử dụng WhiteList để cho phép IP nguồn, Port nguồn nhất định nào đó, còn lại DROP INPUT. Rules ![image](https://hackmd.io/_uploads/rJyAuvBNkg.png) Kết quả: ![image](https://hackmd.io/_uploads/By_0dvSNke.png) Với IPv4 ![image](https://hackmd.io/_uploads/HkGJKPBVJl.png) ## Kết quả và nhận xét Về Scapy để gửi gói tin thì đã hoàn thành và cho ra kết quả như mong đợi là gửi gói tin với máy đích cũng như thay đổi IP nguồn để giả mạo tránh rules iptables. Tường lửa vẫn hoạt động tốt và chặn các gói tin như đúng yêu cầu, tuy nhiên thì các payload trong gói tin vẫn hoạt động và exec thành công. Nhận xét: Scapy là thư viện hỗ trợ gửi gói tin tốt trong Python và Iptables có thể drop các gói tin nhưng không thể chặn các payload trong gói tin thực thi. Lí do tường lửa đã chặn nhưng payload vẫn được thực thi: iptables chỉ kiểm tra các thông tin ở Layer 3 (IP) và Layer 4 (TCP) như địa chỉ IP và cổng, chứ không phân tích dữ liệu trong payload (ví dụ lệnh os.system như trong code của nhóm).