Try   HackMD

透過Python socket 解析封包 和 製作簡易聊天室


解析封包

import socket

host = '0.0.0.0'

ETH_P_ALL = 0x3

ETH_P_IP = 0x0800
ETH_P_ARP = 0x0806
ETH_P_RARP = 0x8035
ETH_P_IPV6 = 0x086dd

"""
ETH_P_IP 0x800 只接收發往本機mac的ip類型的數據幀
ETH_P_ARP 0x806 只接受發往本機mac的arp類型的數據幀
ETH_P_RARP 0x8035 只接受發往本機mac的rarp類型的數據幀
ETH_P_ALL 0x3 接收發往本機mac的所有類型ip arp rarp的數據幀, 接收從本機發出的所有類型的數據幀

https://blog.csdn.net/zxygww/article/details/44859181
"""

counter = 0

def unpack(packet):
    global counter
    counter += 1

    print(packet)
    print(list(packet))

    dst_mac = list(packet)[0:6]
    out = ""
    for i in dst_mac:
        if i>=16: out += hex(i)[2:] + ":"
        else: out += "0" + hex(i)[2:] + ":"

    src_mac = list(packet)[6:12]
    print("dst_mac\t\t"+out[:-1])
    out = ""
    for i in src_mac:
        if i>=16: out += hex(i)[2:] + ":"
        else: out += "0" + hex(i)[2:] + ":"
    print("src_mac\t\t"+out[:-1])

    prot = list(packet)[12:14]
    if prot == [8, 0]:
        print("protocol\tIP")
    elif prot == [6, 0]:
        print("protocol\t\tXNS")
    elif prot == [8, 6]:
        print("protocol\tARP")
    elif prot == [134, 221]:
        # 16進位    86 DD
        print("protocol\tIPv6")
    # https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
    else:
        print(f"protocol\t\t{list(packet)[12:14]} unknown")


    if hex(*packet[14:15]) == '0x0':
        version = '0'
        header_length = '0'
    else:
        version= hex(*packet[14:15])[2::][0:-1]
        header_length = hex(*packet[14:15])[2::][1]

    if version == '0':
        print("Ver\t\t"+version)
    else:
        print("Ver\t\tIPv"+version)
    HL = int(header_length)*4
    print("HL\t\t"+str(HL)+" bytes")

    if HL >20:
        print("options\t\t yes")
    else: print("options\t\tno")


    TL = list(packet[16:18])
    #16  12 34 -> 1*16^3 + 2*16^2 + 3*16 + 4
    #10  18 52
    TL = TL[0]*256 + TL[1]
    
    
    
    print("TL\t\t"+str(TL)+" bytes")

    print("payload length\t"+str(TL-HL)+' bytes')

    TTL = hex(*packet[22:23])[2::]
    print("TTL\t\t"+str(int(TTL,16))+" hops")


    PROC = int(*packet[23:24])
    # packet[23:24] = b'\x11'
    # int(*packet[23:24]) = 17
    proc_set={}
    proc_set[1] = "ICMP"
    proc_set[6] = "TCP"
    proc_set[17] = "UDP"
    # https://zh.wikipedia.org/zh-tw/IP%E5%8D%8F%E8%AE%AE%E5%8F%B7%E5%88%97%E8%A1%A8

    try:
        if int(PROC)==1 or int(PROC)==6 or int(PROC)==17:
            print("next protocol\t"+proc_set[int(PROC)])
        else:
            print(f"unknown next protocol\t+{int(*packet[23:24])}")
    except ValueError as v:
        print(f"unknown next protocol\t+{int(*packet[23:24])}")
        print(v)

    print(counter)
    print("--------------------------------------------")

def main():
    s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
    #                                                    htons()--"Host to Network Short"
    #socket(PF_PACKET, SOCK_RAW, htons(ETH_P_IP|ETH_P_ARP|ETH_P_ALL))發送接收以太網數據幀
    #https://blog.csdn.net/zxygww/article/details/44859181
    
    """
    Raw socket是linux network programming一個很進階的技巧,
    大致上說起來就是,跳過socket所在的transport layer,直接往下撈"封包",
    也不能說封包(packet),因為raw socket撈到的是frame;
    其實就是從linux kernel的ip_recv直接殺一條路出來,
    將尚未解析的frame往user space丟,有註冊的application就可以收起來看,
    當然就包括L3 header甚至L2 header。
    
    https://wirelessr.gitbooks.io/working-life/content/sockraw_with_tcpdump.html
    """
    
    """乙太網的偵結構如下:
    ------------------------------------------------------
    | 目的地址   | 源地址   | 型別     | 資料              |
    ------------------------------------------------------
    |   6 byte   | 6 byte   | 2 byte   | 46~1500 byte   |
    """ #https://www.796t.com/content/1549965263.html
    
    #s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
    while 1:
        try:
            packet, packet_info = s.recvfrom(4096)
            packet = unpack(packet)
            
        except KeyboardInterrupt:
            break
            
        except ValueError:
            print(ValueError)

if __name__ == '__main__':
    main()

實驗 ping

藉由Ping可簡單的得知網路的狀況
Ping 是透過ICMP中的 Echo Request / Echo Reply 完成的
發起Ping之前要先透過ARP找出該IP對應的MAC
資料來源

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


實驗 UDP、TCP

透過UDP、TCP做的聊天室
驗證在TCP建立連線和關閉連線時都會發送封包 而 UDP在中斷連線時並不會發送封包


製作簡易聊天室

TCP/IP 网络通信之 Socket 编程入门


server.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import threading import os HOST = '0.0.0.0' PORT = 7000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen(5) list_of_clients=[] list_of_user=[] record = "" os.system("clear") print('等待其他人加入聊天室') def handle_recv(conn): global record, list_of_clients, list_of_user while True: try: indata = conn.recv(4096).decode() who = indata.split("\t: ")[0] say = indata.split("\t: ")[1] #record += f"who:{who} say:{say}\n" if say == "exit": conn.send(f"exit {who}".encode()) #record += f'send:exit {who}' list_of_clients.pop(list_of_user.index(who)) list_of_user.remove(who) conn.close() os.system("clear") record += f"---- {who_join}離開了聊天室 ----" print(record) os.system("clear") record += f"{who}\t: {say}\n" print(record) tmp = "!"+who+'\t: '+say+'\n' for conn in list_of_clients: conn.send(tmp.encode()) except: pass def handle_send(conn): global record, list_of_clients, list_of_user while True: outdata = "server\t: "+input("(server)輸入訊息 : ") if "print" in outdata: record += f"{list_of_clients}\n" record += f"{list_of_user}\n" os.system("clear") print(record) elif "kick" in outdata: for conn in list_of_clients: conn.send(outdata.encode()) record += f"{list_of_clients}\n" tmp = outdata.split(" ") who = outdata.split(" ")[-1] record += f"who={who}" outdata = f"!{who} was kicked." for conn in list_of_clients: conn.send(outdata.encode()) os.system("clear") print(record) list_of_clients.pop(list_of_user.index(who)) list_of_user.remove(who) else: for conn in list_of_clients: conn.send(outdata.encode()) os.system("clear") record += outdata+'\n' print(record) while True: client, addr = s.accept() who_join = client.recv(4096).decode() list_of_clients.append(client) list_of_user.append(who_join) os.system("clear") record += f"---- {who_join}加入了聊天室 ----\n" print(record) threading.Thread(target=handle_send, args=(client,)).start() threading.Thread(target=handle_recv, args=(client,)).start()

client.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import threading import os #HOST = '192.168.245.128' HOST = '192.168.1.109' PORT = 7000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) record = "" os.system("clear") user_name = input("請輸入你的名稱:") s.send(user_name.encode()) os.system("clear") print(record) def handle_send(): global record,user_name while True: outdata = input(f'({user_name})輸入訊息\t: ') outdata = f"{user_name}\t: {outdata}" if "exit" in outdata: s.send(f"{user_name}\t: exit".encode()) else: try: s.send(outdata.encode()) except: t1.do_run = False t2.do_run = False break record += f"{outdata}\n" os.system("clear") print(record) def handle_recv(): global record, user_name,t1,t2 while True: indata = s.recv(4096).decode() if f"kick {user_name}" in indata: s.close() os.system("clear") print('你被踢出聊天室了!') t1.do_run = False t2.do_run = False break elif f"exit {user_name}" in indata: a = indata s.close() os.system("clear") print('你離開了聊天室!') print(a) t1.do_run = False t2.do_run = False break elif "!" in indata and indata[0:1:]=='!': who = indata[1::].split("\t: ")[0] if who != user_name: record += indata[1::] os.system("clear") print(record) else: try: who = indata.split("\t: ")[0] say = indata.split("\t: ")[1] record += f"{who}\t: {say}\n" os.system("clear") print(record) except: pass t1 = threading.Thread(target=handle_send, args=()) t1.start() t2 = threading.Thread(target=handle_recv, args=()) t2.start()

在IPhone上使用聊天室

環境 Alpine Linux (使用iSH)
使用 apk add 安裝python
用 vi 編輯檔案 (也可以apk add vim、nano)

vi server.py
按i進入insert mode貼上程式碼 按ESC回到normal mode 按:wq儲存並離開
python3 server.py

如果貼上後排版亂掉可以:set paste再重新貼上