### 系統簡章:使用 Python 進行非同步 Arduino 通信
這份簡章介紹了如何利用 Python 與 asyncio 框架結合 serial_asyncio 庫,實現對 Arduino 的非同步串行通信。該系統旨在提供一個高效的方法來從 Python 程式控制和監測 Arduino。
#### ArduinoProtocol 類別
- **功能概述**:
- **`connection_made`**: 當連接成功建立時被呼叫,並儲存傳輸對象。
- **`data_received`**: 接收到數據時被呼叫,並打印數據內容。
- **`connection_lost`**: 連接丟失時被呼叫,並打印錯誤信息。
```python
class ArduinoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print("Connection established with Arduino.")
def data_received(self, data):
print("Data received:", data.decode())
def connection_lost(self, exc):
print("Connection lost:", exc)
```
#### ArduinoConnector 類別
- **功能概述**:
- **`find_port`**: 掃描並找到連接到計算機的 Arduino 裝置端口。
- **`connect`**: 建立串行連接到 Arduino。如果找到適當的端口,則建立連接,並返回連接的端口名稱;如果未找到,則返回 None。
- **`send`**: 向 Arduino 發送命令。如果連接已建立,則發送數據;否則,打印連接未建立的警告。
```python
class ArduinoConnector:
def __init__(self, loop):
self.port = None
self.loop = loop
self.transport = None
self.protocol = None
async def find_port(self):
arduino_ports = [
p.device for p in list_ports.comports() if "Arduino" in p.description
]
return arduino_ports[0] if arduino_ports else None
async def connect(self):
self.port = await self.find_port()
if self.port is None:
print("No Arduino port found.")
return
try:
self.transport, self.protocol = (
await serial_asyncio.create_serial_connection(
self.loop, ArduinoProtocol, self.port, baudrate=9600
)
)
print(f">> Connected to Arduino on {self.port} <<")
except Exception as e:
print(f"Failed to connect to Arduino port: {e}")
def send(self, command):
if self.transport:
self.transport.write(command.encode())
print(f"Command sent: {command}")
else:
print("Connection is not established.")
```
### 使用方法
1. 確保 Arduino 裝置已正確連接到計算機。
2. 在主函數中呼叫 `ArduinoConnector` 的 `connect` 方法來嘗試建立連接。
3. 通過 `send` 方法向 Arduino 發送指令。
```python
async def main():
loop = asyncio.get_running_loop()
arduino = ArduinoConnector(loop)
await arduino.connect()
arduino.send("Your command here")
if __name__ == "__main__":
asyncio.run(main())
```
這個系統提供了一個簡單而有效的方式來從 Python 控制 Arduino 設備,適用於需要實時數據交互的應用場景。
```python
import asyncio
import serial_asyncio
from serial.tools import list_ports
class ArduinoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Data received:", data.decode())
def connection_lost(self, exc):
print("Connection lost:", exc)
class ArduinoConnector:
def __init__(self, loop):
self.port = None
self.loop = loop
self.transport = None
self.protocol = None
async def find_port(self):
arduino_ports = [
p.device for p in list_ports.comports() if "Arduino" in p.description
]
try:
return arduino_ports[0]
except IndexError:
return None
async def connect(self):
self.port = await self.find_port()
if self.port is None:
print("No Arduino port found.")
return
try:
self.transport, self.protocol = (
await serial_asyncio.create_serial_connection(
self.loop, ArduinoProtocol, self.port, baudrate=9600
)
)
print(f">> Connected to Arduino on {self.port} <<")
return self.port
except Exception as e:
print(f"Failed to connect to Arduino port: {e}")
return None
def send(self, command):
if self.transport:
self.transport.write(command.encode())
print(f"Command sent: {command}")
else:
print("Connection is not established.")
async def main():
loop = asyncio.get_running_loop()
arduino = ArduinoConnector(loop)
await arduino.connect()
arduino.send("Your command here")
if __name__ == "__main__":
asyncio.run(main())
```