import socket
host = '0.0.0.0'
ETH_P_ALL = 0x3
ETH_P_IP = 0x0800
ETH_P_ARP = 0x0806
ETH_P_RARP = 0x8035
ETH_P_IPV6 = 0x086dd
"""
ETH_P_IP 0x800 只接收發往本機mac的ip類型的數據幀
ETH_P_ARP 0x806 只接受發往本機mac的arp類型的數據幀
ETH_P_RARP 0x8035 只接受發往本機mac的rarp類型的數據幀
ETH_P_ALL 0x3 接收發往本機mac的所有類型ip arp rarp的數據幀, 接收從本機發出的所有類型的數據幀
https://blog.csdn.net/zxygww/article/details/44859181
"""
counter = 0
def unpack(packet):
global counter
counter += 1
print(packet)
print(list(packet))
dst_mac = list(packet)[0:6]
out = ""
for i in dst_mac:
if i>=16: out += hex(i)[2:] + ":"
else: out += "0" + hex(i)[2:] + ":"
src_mac = list(packet)[6:12]
print("dst_mac\t\t"+out[:-1])
out = ""
for i in src_mac:
if i>=16: out += hex(i)[2:] + ":"
else: out += "0" + hex(i)[2:] + ":"
print("src_mac\t\t"+out[:-1])
prot = list(packet)[12:14]
if prot == [8, 0]:
print("protocol\tIP")
elif prot == [6, 0]:
print("protocol\t\tXNS")
elif prot == [8, 6]:
print("protocol\tARP")
elif prot == [134, 221]:
# 16進位 86 DD
print("protocol\tIPv6")
# https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
else:
print(f"protocol\t\t{list(packet)[12:14]} unknown")
if hex(*packet[14:15]) == '0x0':
version = '0'
header_length = '0'
else:
version= hex(*packet[14:15])[2::][0:-1]
header_length = hex(*packet[14:15])[2::][1]
if version == '0':
print("Ver\t\t"+version)
else:
print("Ver\t\tIPv"+version)
HL = int(header_length)*4
print("HL\t\t"+str(HL)+" bytes")
if HL >20:
print("options\t\t yes")
else: print("options\t\tno")
TL = list(packet[16:18])
#16 12 34 -> 1*16^3 + 2*16^2 + 3*16 + 4
#10 18 52
TL = TL[0]*256 + TL[1]
print("TL\t\t"+str(TL)+" bytes")
print("payload length\t"+str(TL-HL)+' bytes')
TTL = hex(*packet[22:23])[2::]
print("TTL\t\t"+str(int(TTL,16))+" hops")
PROC = int(*packet[23:24])
# packet[23:24] = b'\x11'
# int(*packet[23:24]) = 17
proc_set={}
proc_set[1] = "ICMP"
proc_set[6] = "TCP"
proc_set[17] = "UDP"
# https://zh.wikipedia.org/zh-tw/IP%E5%8D%8F%E8%AE%AE%E5%8F%B7%E5%88%97%E8%A1%A8
try:
if int(PROC)==1 or int(PROC)==6 or int(PROC)==17:
print("next protocol\t"+proc_set[int(PROC)])
else:
print(f"unknown next protocol\t+{int(*packet[23:24])}")
except ValueError as v:
print(f"unknown next protocol\t+{int(*packet[23:24])}")
print(v)
print(counter)
print("--------------------------------------------")
def main():
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
# htons()--"Host to Network Short"
#socket(PF_PACKET, SOCK_RAW, htons(ETH_P_IP|ETH_P_ARP|ETH_P_ALL))發送接收以太網數據幀
#https://blog.csdn.net/zxygww/article/details/44859181
"""
Raw socket是linux network programming一個很進階的技巧,
大致上說起來就是,跳過socket所在的transport layer,直接往下撈"封包",
也不能說封包(packet),因為raw socket撈到的是frame;
其實就是從linux kernel的ip_recv直接殺一條路出來,
將尚未解析的frame往user space丟,有註冊的application就可以收起來看,
當然就包括L3 header甚至L2 header。
https://wirelessr.gitbooks.io/working-life/content/sockraw_with_tcpdump.html
"""
"""乙太網的偵結構如下:
------------------------------------------------------
| 目的地址 | 源地址 | 型別 | 資料 |
------------------------------------------------------
| 6 byte | 6 byte | 2 byte | 46~1500 byte |
""" #https://www.796t.com/content/1549965263.html
#s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
while 1:
try:
packet, packet_info = s.recvfrom(4096)
packet = unpack(packet)
except KeyboardInterrupt:
break
except ValueError:
print(ValueError)
if __name__ == '__main__':
main()
藉由Ping可簡單的得知網路的狀況
Ping 是透過ICMP中的 Echo Request / Echo Reply 完成的
發起Ping之前要先透過ARP找出該IP對應的MAC
資料來源
Learn More →
透過UDP、TCP做的聊天室
驗證在TCP建立連線和關閉連線時都會發送封包 而 UDP在中斷連線時並不會發送封包
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import threading
import os
HOST = '0.0.0.0'
PORT = 7000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen(5)
list_of_clients=[]
list_of_user=[]
record = ""
os.system("clear")
print('等待其他人加入聊天室')
def handle_recv(conn):
global record, list_of_clients, list_of_user
while True:
try:
indata = conn.recv(4096).decode()
who = indata.split("\t: ")[0]
say = indata.split("\t: ")[1]
#record += f"who:{who} say:{say}\n"
if say == "exit":
conn.send(f"exit {who}".encode())
#record += f'send:exit {who}'
list_of_clients.pop(list_of_user.index(who))
list_of_user.remove(who)
conn.close()
os.system("clear")
record += f"---- {who_join}離開了聊天室 ----"
print(record)
os.system("clear")
record += f"{who}\t: {say}\n"
print(record)
tmp = "!"+who+'\t: '+say+'\n'
for conn in list_of_clients:
conn.send(tmp.encode())
except:
pass
def handle_send(conn):
global record, list_of_clients, list_of_user
while True:
outdata = "server\t: "+input("(server)輸入訊息 : ")
if "print" in outdata:
record += f"{list_of_clients}\n"
record += f"{list_of_user}\n"
os.system("clear")
print(record)
elif "kick" in outdata:
for conn in list_of_clients:
conn.send(outdata.encode())
record += f"{list_of_clients}\n"
tmp = outdata.split(" ")
who = outdata.split(" ")[-1]
record += f"who={who}"
outdata = f"!{who} was kicked."
for conn in list_of_clients:
conn.send(outdata.encode())
os.system("clear")
print(record)
list_of_clients.pop(list_of_user.index(who))
list_of_user.remove(who)
else:
for conn in list_of_clients:
conn.send(outdata.encode())
os.system("clear")
record += outdata+'\n'
print(record)
while True:
client, addr = s.accept()
who_join = client.recv(4096).decode()
list_of_clients.append(client)
list_of_user.append(who_join)
os.system("clear")
record += f"---- {who_join}加入了聊天室 ----\n"
print(record)
threading.Thread(target=handle_send, args=(client,)).start()
threading.Thread(target=handle_recv, args=(client,)).start()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import threading
import os
#HOST = '192.168.245.128'
HOST = '192.168.1.109'
PORT = 7000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
record = ""
os.system("clear")
user_name = input("請輸入你的名稱:")
s.send(user_name.encode())
os.system("clear")
print(record)
def handle_send():
global record,user_name
while True:
outdata = input(f'({user_name})輸入訊息\t: ')
outdata = f"{user_name}\t: {outdata}"
if "exit" in outdata:
s.send(f"{user_name}\t: exit".encode())
else:
try:
s.send(outdata.encode())
except:
t1.do_run = False
t2.do_run = False
break
record += f"{outdata}\n"
os.system("clear")
print(record)
def handle_recv():
global record, user_name,t1,t2
while True:
indata = s.recv(4096).decode()
if f"kick {user_name}" in indata:
s.close()
os.system("clear")
print('你被踢出聊天室了!')
t1.do_run = False
t2.do_run = False
break
elif f"exit {user_name}" in indata:
a = indata
s.close()
os.system("clear")
print('你離開了聊天室!')
print(a)
t1.do_run = False
t2.do_run = False
break
elif "!" in indata and indata[0:1:]=='!':
who = indata[1::].split("\t: ")[0]
if who != user_name:
record += indata[1::]
os.system("clear")
print(record)
else:
try:
who = indata.split("\t: ")[0]
say = indata.split("\t: ")[1]
record += f"{who}\t: {say}\n"
os.system("clear")
print(record)
except:
pass
t1 = threading.Thread(target=handle_send, args=())
t1.start()
t2 = threading.Thread(target=handle_recv, args=())
t2.start()
環境 Alpine Linux (使用iSH)
使用 apk add 安裝python
用 vi 編輯檔案 (也可以apk add vim、nano)
vi server.py
按i進入insert mode貼上程式碼 按ESC回到normal mode 按:wq儲存並離開
python3 server.py
如果貼上後排版亂掉可以
:set paste
再重新貼上
or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up