## TCP Server
Server.py
```python=
import socket
import threading
class TCPServer:
def __init__(self, host='127.0.0.1', port=8889):
self.server_host = host
self.server_port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_sockets = []
self.is_running = False
def start(self):
"""啟動 TCP 伺服器"""
try:
self.server_socket.bind((self.server_host, self.server_port))
self.server_socket.listen(5)
self.is_running = True
print(f"Server On, Address: {self.server_host}:{self.server_port}")
self.accept_connections()
except Exception as e:
print(f"ERROR MESSAGE: {e}")
def stop(self):
"""停止 TCP 伺服器"""
self.is_running = False
self.server_socket.close()
for client_socket in self.client_sockets:
client_socket.close()
print("SERVER IS SHUTDOWN")
def accept_connections(self):
"""接受客戶端的連接請求"""
while self.is_running:
try:
client_socket, client_address = self.server_socket.accept()
print(f"Connection From :{client_address}.")
self.client_sockets.append(client_socket)
client_thread = threading.Thread(target=self.handle_client, args=(client_socket,))
client_thread.start()
except Exception as e:
print(f"ERROR MESSAGE: {e}")
def handle_client(self, client_socket):
"""處理與已連接客戶端的通訊"""
try:
client_address = client_socket.getpeername()
while self.is_running:
data = client_socket.recv(1024)
recv_data = data.decode('utf-8')
if not data:
break
print(f"Recv Data: {recv_data}")
self.send_data(client_socket, f"resp: {data.decode('utf-8')}")
except Exception as e:
print(f"ERROR MESSAGE: {e}")
finally:
client_socket.close()
self.client_sockets.remove(client_socket)
print(f"{client_address}: Connection closed")
def send_data(self, client_socket, message):
"""向客戶端發送資料"""
try:
client_socket.send(message.encode('utf-8'))
except Exception as e:
print(f"ERROR MESSAGE: {e}")
if __name__ == "__main__":
server = TCPServer(host='0.0.0.0', port=8889)
try:
server.start()
except KeyboardInterrupt:
server.stop()
```
Client.py
```python=
import socket
import json # 引入json模組
class TCPClient:
def __init__(self, server_ip='127.0.0.1', server_port=8889):
self.server_ip = server_ip
self.server_port = server_port
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self):
"""連接到 TCP 伺服器"""
try:
self.client_socket.connect((self.server_ip, self.server_port))
print(f"已成功連接到伺服器 {self.server_ip}:{self.server_port}")
except Exception as e:
print(f"連接伺服器時發生錯誤: {e}")
def send_message(self, message):
"""發送訊息到伺服器並接收回應"""
try:
# 檢查訊息類型
if isinstance(message, dict):
# 如果是字典,轉換為JSON字串
message_str = json.dumps(message)
print("已自動將字典轉換為JSON格式")
else:
# 如果是字串,直接發送
message_str = message
# 發送訊息到伺服器
self.client_socket.send(message_str.encode('utf-8'))
print(f"已發送訊息: {message_str}")
# 接收來自伺服器的回應
response = self.client_socket.recv(1024)
print(f"來自伺服器的回應: {response.decode('utf-8')}")
except Exception as e:
print(f"發送訊息時發生錯誤: {e}")
def close(self):
"""關閉與伺服器的連接"""
try:
self.client_socket.close()
print("已關閉與伺服器的連接")
except Exception as e:
print(f"關閉連接時發生錯誤: {e}")
if __name__ == "__main__":
client = TCPClient(server_ip='127.0.0.1', server_port=8889)
try:
client.connect()
# 傳送JSON格式的訊息
send_message_dict = {
"deviceId": "TC003",
"time_plan": 1,
"phaseOrder": "00",
}
# 發送字典訊息給伺服器
client.send_message(send_message_dict)
# 傳送純文字的訊息
client.send_message("Hello Server")
finally:
# 關閉連接
client.close()
```