### RasperryPi IOT系統簡章:非同步串列通訊至 Arduino #### 功能概述 1. **尋找 Arduino 連接埠**:自動掃描計算機上所有連接的串列埠,找到描述中包含 "Arduino" 的埠。 2. **建立串列連接**:利用找到的 Arduino 連接埠建立非同步串列連接。 3. **發送命令**:向 Arduino 發送命令,並確認命令是否成功發送。 #### 主要組件 1. **ArduinoProtocol 類**: - 負責處理與 Arduino 的通訊事件,包括連接建立、數據接收和連接丟失。 ```python class ArduinoProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport print("Connection established.") def data_received(self, data): print("Data received:", data.decode()) def connection_lost(self, exc): print("Connection lost:", exc) ``` 2. **ArduinoConnector 類**: - 負責尋找合適的 Arduino 連接埠並建立連接。 - 提供一個方法來發送串列命令給 Arduino。 ```python class ArduinoConnector: def __init__(self, loop): self.port = None self.loop = loop self.transport = None self.protocol = None async def find_port(self): arduino_ports = [ p.device for p in list_ports.comports() if "Arduino" in p.description ] try: return arduino_ports[0] except IndexError: return None async def connect(self): self.port = await self.find_port() if self.port is None: print("No Arduino port found.") return try: self.transport, self.protocol = ( await serial_asyncio.create_serial_connection( self.loop, ArduinoProtocol, self.port, baudrate=9600 ) ) print(f">> Connected to Arduino on {self.port} <<") except Exception as e: print(f"Failed to connect to Arduino port: {e}") ``` 3. **發送命令**: - 檢查是否已建立連接,然後發送命令。 ```python def send(self, command): if self.transport: self.transport.write(command.encode()) print(f"Command sent: {command}") else: print("Connection is not established.") ``` #### 使用指南 1. **檢查 Arduino 連接**:確保 Arduino 正確連接到計算機上的一個串列埠。 2. **執行程式**:運行上述 Python 程式,程式將自動尋找 Arduino 連接埠,建立連接,並允許向其發送命令。 此程式適用於需要從計算機控制 Arduino 執行特定任務的情境,例如自動化測試、機器人控制或其他硬件交互項目。 #### 原代碼 ```python import asyncio import serial_asyncio from serial.tools import list_ports class ArduinoProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport def data_received(self, data): print("Data received:", data.decode()) def connection_lost(self, exc): print("Connection lost:", exc) class ArduinoConnector: def __init__(self, loop): self.port = None self.loop = loop self.transport = None self.protocol = None async def find_port(self): arduino_ports = [ p.device for p in list_ports.comports() if "Arduino" in p.description ] try: return arduino_ports[0] except IndexError: return None async def connect(self): self.port = await self.find_port() if self.port is None: print("No Arduino port found.") return try: self.transport, self.protocol = ( await serial_asyncio.create_serial_connection( self.loop, ArduinoProtocol, self.port, baudrate=9600 ) ) print(f">> Connected to Arduino on {self.port} <<") return self.port except Exception as e: print(f"Failed to connect to Arduino port: {e}") return None def send(self, command): if self.transport: self.transport.write(command.encode()) print(f"Command sent: {command}") else: print("Connection is not established.") async def main(): loop = asyncio.get_running_loop() arduino = ArduinoConnector(loop) await arduino.connect() arduino.send("Your command here") if __name__ == "__main__": asyncio.run(main()) ```