--- tags: 網路課tcp/ip --- # 透過Python socket 解析封包 和 製作簡易聊天室 [TOC] --- ## [解析封包](https://youtu.be/s44mnj_ZIQA) ``` python import socket host = '0.0.0.0' ETH_P_ALL = 0x3 ETH_P_IP = 0x0800 ETH_P_ARP = 0x0806 ETH_P_RARP = 0x8035 ETH_P_IPV6 = 0x086dd """ ETH_P_IP 0x800 只接收發往本機mac的ip類型的數據幀 ETH_P_ARP 0x806 只接受發往本機mac的arp類型的數據幀 ETH_P_RARP 0x8035 只接受發往本機mac的rarp類型的數據幀 ETH_P_ALL 0x3 接收發往本機mac的所有類型ip arp rarp的數據幀, 接收從本機發出的所有類型的數據幀 https://blog.csdn.net/zxygww/article/details/44859181 """ counter = 0 def unpack(packet): global counter counter += 1 print(packet) print(list(packet)) dst_mac = list(packet)[0:6] out = "" for i in dst_mac: if i>=16: out += hex(i)[2:] + ":" else: out += "0" + hex(i)[2:] + ":" src_mac = list(packet)[6:12] print("dst_mac\t\t"+out[:-1]) out = "" for i in src_mac: if i>=16: out += hex(i)[2:] + ":" else: out += "0" + hex(i)[2:] + ":" print("src_mac\t\t"+out[:-1]) prot = list(packet)[12:14] if prot == [8, 0]: print("protocol\tIP") elif prot == [6, 0]: print("protocol\t\tXNS") elif prot == [8, 6]: print("protocol\tARP") elif prot == [134, 221]: # 16進位 86 DD print("protocol\tIPv6") # https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml else: print(f"protocol\t\t{list(packet)[12:14]} unknown") if hex(*packet[14:15]) == '0x0': version = '0' header_length = '0' else: version= hex(*packet[14:15])[2::][0:-1] header_length = hex(*packet[14:15])[2::][1] if version == '0': print("Ver\t\t"+version) else: print("Ver\t\tIPv"+version) HL = int(header_length)*4 print("HL\t\t"+str(HL)+" bytes") if HL >20: print("options\t\t yes") else: print("options\t\tno") TL = list(packet[16:18]) #16 12 34 -> 1*16^3 + 2*16^2 + 3*16 + 4 #10 18 52 TL = TL[0]*256 + TL[1] print("TL\t\t"+str(TL)+" bytes") print("payload length\t"+str(TL-HL)+' bytes') TTL = hex(*packet[22:23])[2::] print("TTL\t\t"+str(int(TTL,16))+" hops") PROC = int(*packet[23:24]) # packet[23:24] = b'\x11' # int(*packet[23:24]) = 17 proc_set={} proc_set[1] = "ICMP" proc_set[6] = "TCP" proc_set[17] = "UDP" # https://zh.wikipedia.org/zh-tw/IP%E5%8D%8F%E8%AE%AE%E5%8F%B7%E5%88%97%E8%A1%A8 try: if int(PROC)==1 or int(PROC)==6 or int(PROC)==17: print("next protocol\t"+proc_set[int(PROC)]) else: print(f"unknown next protocol\t+{int(*packet[23:24])}") except ValueError as v: print(f"unknown next protocol\t+{int(*packet[23:24])}") print(v) print(counter) print("--------------------------------------------") def main(): s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL)) # htons()--"Host to Network Short" #socket(PF_PACKET, SOCK_RAW, htons(ETH_P_IP|ETH_P_ARP|ETH_P_ALL))發送接收以太網數據幀 #https://blog.csdn.net/zxygww/article/details/44859181 """ Raw socket是linux network programming一個很進階的技巧, 大致上說起來就是,跳過socket所在的transport layer,直接往下撈"封包", 也不能說封包(packet),因為raw socket撈到的是frame; 其實就是從linux kernel的ip_recv直接殺一條路出來, 將尚未解析的frame往user space丟,有註冊的application就可以收起來看, 當然就包括L3 header甚至L2 header。 https://wirelessr.gitbooks.io/working-life/content/sockraw_with_tcpdump.html """ """乙太網的偵結構如下: ------------------------------------------------------ | 目的地址 | 源地址 | 型別 | 資料 | ------------------------------------------------------ | 6 byte | 6 byte | 2 byte | 46~1500 byte | """ #https://www.796t.com/content/1549965263.html #s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP)) while 1: try: packet, packet_info = s.recvfrom(4096) packet = unpack(packet) except KeyboardInterrupt: break except ValueError: print(ValueError) if __name__ == '__main__': main() ``` --- ### [實驗 ping](https://youtu.be/-fBqSx5-nJQ) >藉由Ping可簡單的得知網路的狀況 Ping 是透過ICMP中的 Echo Request / Echo Reply 完成的 發起Ping之前要先透過ARP找出該IP對應的MAC [資料來源](https://medium.com/swark/%E9%82%A3%E4%BA%9B%E9%97%9C%E6%96%BCping%E7%9A%84%E4%B8%80%E4%BA%9B%E4%BA%8B-3bdfbef4bb17) ![](https://i.imgur.com/xNsO1HR.png) --- ### [實驗 UDP、TCP](https://youtu.be/uecfDgIzEzw) 透過UDP、TCP做的聊天室 驗證在TCP建立連線和關閉連線時都會發送封包 而 UDP在中斷連線時並不會發送封包 --- ## [製作簡易聊天室](https://youtu.be/84qlD31JcZg) [TCP/IP 网络通信之 Socket 编程入门](https://www.youtube.com/watch?v=ST6WLZFSHXs&list=LL&index=21&ab_channel=%E5%A5%87%E4%B9%90%E7%BC%96%E7%A8%8B%E5%AD%A6%E9%99%A2) --- #### server.py ```python= #!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import threading import os HOST = '0.0.0.0' PORT = 7000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen(5) list_of_clients=[] list_of_user=[] record = "" os.system("clear") print('等待其他人加入聊天室') def handle_recv(conn): global record, list_of_clients, list_of_user while True: try: indata = conn.recv(4096).decode() who = indata.split("\t: ")[0] say = indata.split("\t: ")[1] #record += f"who:{who} say:{say}\n" if say == "exit": conn.send(f"exit {who}".encode()) #record += f'send:exit {who}' list_of_clients.pop(list_of_user.index(who)) list_of_user.remove(who) conn.close() os.system("clear") record += f"---- {who_join}離開了聊天室 ----" print(record) os.system("clear") record += f"{who}\t: {say}\n" print(record) tmp = "!"+who+'\t: '+say+'\n' for conn in list_of_clients: conn.send(tmp.encode()) except: pass def handle_send(conn): global record, list_of_clients, list_of_user while True: outdata = "server\t: "+input("(server)輸入訊息 : ") if "print" in outdata: record += f"{list_of_clients}\n" record += f"{list_of_user}\n" os.system("clear") print(record) elif "kick" in outdata: for conn in list_of_clients: conn.send(outdata.encode()) record += f"{list_of_clients}\n" tmp = outdata.split(" ") who = outdata.split(" ")[-1] record += f"who={who}" outdata = f"!{who} was kicked." for conn in list_of_clients: conn.send(outdata.encode()) os.system("clear") print(record) list_of_clients.pop(list_of_user.index(who)) list_of_user.remove(who) else: for conn in list_of_clients: conn.send(outdata.encode()) os.system("clear") record += outdata+'\n' print(record) while True: client, addr = s.accept() who_join = client.recv(4096).decode() list_of_clients.append(client) list_of_user.append(who_join) os.system("clear") record += f"---- {who_join}加入了聊天室 ----\n" print(record) threading.Thread(target=handle_send, args=(client,)).start() threading.Thread(target=handle_recv, args=(client,)).start() ``` #### client.py ```python= #!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import threading import os #HOST = '192.168.245.128' HOST = '192.168.1.109' PORT = 7000 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) record = "" os.system("clear") user_name = input("請輸入你的名稱:") s.send(user_name.encode()) os.system("clear") print(record) def handle_send(): global record,user_name while True: outdata = input(f'({user_name})輸入訊息\t: ') outdata = f"{user_name}\t: {outdata}" if "exit" in outdata: s.send(f"{user_name}\t: exit".encode()) else: try: s.send(outdata.encode()) except: t1.do_run = False t2.do_run = False break record += f"{outdata}\n" os.system("clear") print(record) def handle_recv(): global record, user_name,t1,t2 while True: indata = s.recv(4096).decode() if f"kick {user_name}" in indata: s.close() os.system("clear") print('你被踢出聊天室了!') t1.do_run = False t2.do_run = False break elif f"exit {user_name}" in indata: a = indata s.close() os.system("clear") print('你離開了聊天室!') print(a) t1.do_run = False t2.do_run = False break elif "!" in indata and indata[0:1:]=='!': who = indata[1::].split("\t: ")[0] if who != user_name: record += indata[1::] os.system("clear") print(record) else: try: who = indata.split("\t: ")[0] say = indata.split("\t: ")[1] record += f"{who}\t: {say}\n" os.system("clear") print(record) except: pass t1 = threading.Thread(target=handle_send, args=()) t1.start() t2 = threading.Thread(target=handle_recv, args=()) t2.start() ``` --- ### [在IPhone上使用聊天室](https://youtu.be/Gn6WoLMT-SQ) 環境 Alpine Linux (使用iSH) 使用 apk add 安裝python 用 vi 編輯檔案 (也可以apk add vim、nano) `vi server.py` 按i進入insert mode貼上程式碼 按ESC回到normal mode 按:wq儲存並離開 `python3 server.py` > 如果貼上後排版亂掉可以`:set paste`再重新貼上