### RasperryPi IOT系統簡章:基於 aiohttp 的車輛控制與視頻流服務
這份簡章介紹了如何使用 Python 的 `aiohttp`、`asyncio`、`OpenCV` 和 `pyserial` 函式庫來建立一個非同步 Web 服務器。該服務器能夠控制通過串列埠連接的 Arduino 控制車輛,並從連接的攝像頭實時傳輸視頻。
#### 功能概述
1. **命令處理**:伺服器提供 `/move` 端點接收 POST 請求,將命令透過串列埠發送至 Arduino 控制的車輛。
2. **視頻流傳輸**:通過 `/video_feed` 端點,伺服器以 MJPEG 格式實時傳輸來自攝像頭的視頻,供用戶實時查看。
#### 主要組件與程式碼說明
1. **CarController 類**:
- 使用 `serial` 函式庫來處理與 Arduino 的串行通信。
- 提供命令處理與視頻流的方法。
```python
class CarController:
def __init__(self):
self.serial_port = serial.Serial("COM4", 9600, timeout=1)
def process_command(self, command):
print(f"處理 Arduino 命令: {command}")
if self.serial_port.is_open:
self.serial_port.write((command + '\n').encode())
print(f"命令發送到 Arduino: {command}")
else:
print("串行端口未打開。")
```
2. **命令處理方法 (`handle_move`)**:
- 解析從 HTTP 請求中接收到的 JSON 數據。
- 將命令發送至連接的 Arduino 裝置。
- 回應客戶端命令已被處理的訊息。
```python
async def handle_move(self, request):
data = await request.json()
command = data.get('command')
if command:
self.process_command(command)
return web.Response(text=f"命令 '{command}' 已處理。")
return web.Response(text="請求中沒有包含有效的命令。")
```
3. **視頻流傳輸方法 (`stream_video`)**:
- 初始化攝像頭並持續讀取影像。
- 將影像編碼為 JPEG 格式並透過 HTTP 傳輸。
```python
async def stream_video(self, request):
response = StreamResponse()
response.content_type = 'multipart/x-mixed-replace; boundary=frame'
await response.prepare(request)
cap = cv2.VideoCapture(0)
if not cap.isOpened():
raise RuntimeError("無法啟動攝像頭。")
try:
while True:
ret, frame = cap.read()
if not ret:
break
_, buffer = cv2.imencode('.jpg', frame)
jpg_as_text = buffer.tobytes()
data = (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg_as_text + b'\r\n')
await response.write(data)
await asyncio.sleep(0.05) # 控制幀率
finally:
cap.release()
return response
```
#### 服務部署與執行
- 啟動伺服器並設定路由,提供命令處理與視頻流傳輸的功能。
```python
async def init_app():
app = web.Application()
controller = CarController()
app.add_routes([web.post('/move', controller.handle_move)])
app.add_routes([web.get('/video_feed', controller.stream_video)])
return app
async def main():
app = await init_app()
await run_server(app)
```
透過上述設定,系統能夠實現對 Arduino 控制車輛的遠程控制及視頻監控,適合應用於遠程監控、教育或娛樂等領域。
#### 原代碼
```python
import asyncio
from aiohttp import web
from aiohttp.web_response import StreamResponse
import serial
import sys
import cv2 # 確保已安裝OpenCV庫
class CarController:
def __init__(self):
self.serial_port = serial.Serial("COM4", 9600, timeout=1)
def process_command(self, command):
print(f"處理 Arduino 命令: {command}")
if self.serial_port.is_open:
try:
self.serial_port.write((command + '\n').encode())
print(f"命令發送到 Arduino: {command}")
except serial.SerialException as e:
print(f"向 Arduino 發送命令失敗: {e}")
else:
print("串行端口未打開。")
def update_console(self, message):
sys.stdout.write(f"\r{message}{' ' * 20}\n")
sys.stdout.flush()
async def handle_move(self, request):
data = await request.json()
command = data.get('command')
if command:
self.update_console(f"命令 '{command}' 已處理。 ")
return web.Response(text=f"命令 '{command}' 已處理。")
return web.Response(text="請求中沒有包含有效的命令。")
async def stream_video(self, request):
response = StreamResponse()
response.content_type = 'multipart/x-mixed-replace; boundary=frame'
await response.prepare(request)
cap = cv2.VideoCapture(0)
if not cap.isOpened():
raise RuntimeError("無法啟動攝像頭。")
try:
while True:
ret, frame = cap.read()
if not ret:
break
_, buffer = cv2.imencode('.jpg', frame)
jpg_as_text = buffer.tobytes()
data = (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpg_as_text + b'\r\n')
await response.write(data)
await asyncio.sleep(0.05) # 控制幀率
except Exception as e:
print(e)
finally:
cap.release()
return response
async def init_app():
app = web.Application()
controller = CarController()
app.add_routes([web.post('/move', controller.handle_move)])
app.add_routes([web.get('/video_feed', controller.stream_video)])
return app
async def run_server(app, host="0.0.0.0", port=7024):
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f"伺服器運行在 http://{host}:{port}")
await asyncio.Event().wait()
async def main():
app = await init_app()
await run_server(app)
if __name__ == "__main__":
asyncio.run(main())
```