### RasperryPi IOT系統簡章:非同步串列通訊至 Arduino
#### 功能概述
1. **尋找 Arduino 連接埠**:自動掃描計算機上所有連接的串列埠,找到描述中包含 "Arduino" 的埠。
2. **建立串列連接**:利用找到的 Arduino 連接埠建立非同步串列連接。
3. **發送命令**:向 Arduino 發送命令,並確認命令是否成功發送。
#### 主要組件
1. **ArduinoProtocol 類**:
- 負責處理與 Arduino 的通訊事件,包括連接建立、數據接收和連接丟失。
```python
class ArduinoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print("Connection established.")
def data_received(self, data):
print("Data received:", data.decode())
def connection_lost(self, exc):
print("Connection lost:", exc)
```
2. **ArduinoConnector 類**:
- 負責尋找合適的 Arduino 連接埠並建立連接。
- 提供一個方法來發送串列命令給 Arduino。
```python
class ArduinoConnector:
def __init__(self, loop):
self.port = None
self.loop = loop
self.transport = None
self.protocol = None
async def find_port(self):
arduino_ports = [
p.device for p in list_ports.comports() if "Arduino" in p.description
]
try:
return arduino_ports[0]
except IndexError:
return None
async def connect(self):
self.port = await self.find_port()
if self.port is None:
print("No Arduino port found.")
return
try:
self.transport, self.protocol = (
await serial_asyncio.create_serial_connection(
self.loop, ArduinoProtocol, self.port, baudrate=9600
)
)
print(f">> Connected to Arduino on {self.port} <<")
except Exception as e:
print(f"Failed to connect to Arduino port: {e}")
```
3. **發送命令**:
- 檢查是否已建立連接,然後發送命令。
```python
def send(self, command):
if self.transport:
self.transport.write(command.encode())
print(f"Command sent: {command}")
else:
print("Connection is not established.")
```
#### 使用指南
1. **檢查 Arduino 連接**:確保 Arduino 正確連接到計算機上的一個串列埠。
2. **執行程式**:運行上述 Python 程式,程式將自動尋找 Arduino 連接埠,建立連接,並允許向其發送命令。
此程式適用於需要從計算機控制 Arduino 執行特定任務的情境,例如自動化測試、機器人控制或其他硬件交互項目。
#### 原代碼
```python
import asyncio
import serial_asyncio
from serial.tools import list_ports
class ArduinoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Data received:", data.decode())
def connection_lost(self, exc):
print("Connection lost:", exc)
class ArduinoConnector:
def __init__(self, loop):
self.port = None
self.loop = loop
self.transport = None
self.protocol = None
async def find_port(self):
arduino_ports = [
p.device for p in list_ports.comports() if "Arduino" in p.description
]
try:
return arduino_ports[0]
except IndexError:
return None
async def connect(self):
self.port = await self.find_port()
if self.port is None:
print("No Arduino port found.")
return
try:
self.transport, self.protocol = (
await serial_asyncio.create_serial_connection(
self.loop, ArduinoProtocol, self.port, baudrate=9600
)
)
print(f">> Connected to Arduino on {self.port} <<")
return self.port
except Exception as e:
print(f"Failed to connect to Arduino port: {e}")
return None
def send(self, command):
if self.transport:
self.transport.write(command.encode())
print(f"Command sent: {command}")
else:
print("Connection is not established.")
async def main():
loop = asyncio.get_running_loop()
arduino = ArduinoConnector(loop)
await arduino.connect()
arduino.send("Your command here")
if __name__ == "__main__":
asyncio.run(main())
```