## TCP Server Server.py ```python= import socket import threading class TCPServer: def __init__(self, host='127.0.0.1', port=8889): self.server_host = host self.server_port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_sockets = [] self.is_running = False def start(self): """啟動 TCP 伺服器""" try: self.server_socket.bind((self.server_host, self.server_port)) self.server_socket.listen(5) self.is_running = True print(f"Server On, Address: {self.server_host}:{self.server_port}") self.accept_connections() except Exception as e: print(f"ERROR MESSAGE: {e}") def stop(self): """停止 TCP 伺服器""" self.is_running = False self.server_socket.close() for client_socket in self.client_sockets: client_socket.close() print("SERVER IS SHUTDOWN") def accept_connections(self): """接受客戶端的連接請求""" while self.is_running: try: client_socket, client_address = self.server_socket.accept() print(f"Connection From :{client_address}.") self.client_sockets.append(client_socket) client_thread = threading.Thread(target=self.handle_client, args=(client_socket,)) client_thread.start() except Exception as e: print(f"ERROR MESSAGE: {e}") def handle_client(self, client_socket): """處理與已連接客戶端的通訊""" try: client_address = client_socket.getpeername() while self.is_running: data = client_socket.recv(1024) recv_data = data.decode('utf-8') if not data: break print(f"Recv Data: {recv_data}") self.send_data(client_socket, f"resp: {data.decode('utf-8')}") except Exception as e: print(f"ERROR MESSAGE: {e}") finally: client_socket.close() self.client_sockets.remove(client_socket) print(f"{client_address}: Connection closed") def send_data(self, client_socket, message): """向客戶端發送資料""" try: client_socket.send(message.encode('utf-8')) except Exception as e: print(f"ERROR MESSAGE: {e}") if __name__ == "__main__": server = TCPServer(host='0.0.0.0', port=8889) try: server.start() except KeyboardInterrupt: server.stop() ``` Client.py ```python= import socket import json # 引入json模組 class TCPClient: def __init__(self, server_ip='127.0.0.1', server_port=8889): self.server_ip = server_ip self.server_port = server_port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): """連接到 TCP 伺服器""" try: self.client_socket.connect((self.server_ip, self.server_port)) print(f"已成功連接到伺服器 {self.server_ip}:{self.server_port}") except Exception as e: print(f"連接伺服器時發生錯誤: {e}") def send_message(self, message): """發送訊息到伺服器並接收回應""" try: # 檢查訊息類型 if isinstance(message, dict): # 如果是字典,轉換為JSON字串 message_str = json.dumps(message) print("已自動將字典轉換為JSON格式") else: # 如果是字串,直接發送 message_str = message # 發送訊息到伺服器 self.client_socket.send(message_str.encode('utf-8')) print(f"已發送訊息: {message_str}") # 接收來自伺服器的回應 response = self.client_socket.recv(1024) print(f"來自伺服器的回應: {response.decode('utf-8')}") except Exception as e: print(f"發送訊息時發生錯誤: {e}") def close(self): """關閉與伺服器的連接""" try: self.client_socket.close() print("已關閉與伺服器的連接") except Exception as e: print(f"關閉連接時發生錯誤: {e}") if __name__ == "__main__": client = TCPClient(server_ip='127.0.0.1', server_port=8889) try: client.connect() # 傳送JSON格式的訊息 send_message_dict = { "deviceId": "TC003", "time_plan": 1, "phaseOrder": "00", } # 發送字典訊息給伺服器 client.send_message(send_message_dict) # 傳送純文字的訊息 client.send_message("Hello Server") finally: # 關閉連接 client.close() ```