### RasperryPi IOT系統簡章:基於 aiohttp 的車輛控制與視頻流服務 這份簡章介紹了如何使用 Python 的 `aiohttp`、`asyncio`、`OpenCV` 和 `pyserial` 函式庫來建立一個非同步 Web 服務器。該服務器能夠控制通過串列埠連接的 Arduino 控制車輛,並從連接的攝像頭實時傳輸視頻。 #### 功能概述 1. **命令處理**:伺服器提供 `/move` 端點接收 POST 請求,將命令透過串列埠發送至 Arduino 控制的車輛。 2. **視頻流傳輸**:通過 `/video_feed` 端點,伺服器以 MJPEG 格式實時傳輸來自攝像頭的視頻,供用戶實時查看。 #### 主要組件與程式碼說明 1. **CarController 類**: - 使用 `serial` 函式庫來處理與 Arduino 的串行通信。 - 提供命令處理與視頻流的方法。 ```python class CarController: def __init__(self): self.serial_port = serial.Serial("COM4", 9600, timeout=1) def process_command(self, command): print(f"處理 Arduino 命令: {command}") if self.serial_port.is_open: self.serial_port.write((command + '\n').encode()) print(f"命令發送到 Arduino: {command}") else: print("串行端口未打開。") ``` 2. **命令處理方法 (`handle_move`)**: - 解析從 HTTP 請求中接收到的 JSON 數據。 - 將命令發送至連接的 Arduino 裝置。 - 回應客戶端命令已被處理的訊息。 ```python async def handle_move(self, request): data = await request.json() command = data.get('command') if command: self.process_command(command) return web.Response(text=f"命令 '{command}' 已處理。") return web.Response(text="請求中沒有包含有效的命令。") ``` 3. **視頻流傳輸方法 (`stream_video`)**: - 初始化攝像頭並持續讀取影像。 - 將影像編碼為 JPEG 格式並透過 HTTP 傳輸。 ```python async def stream_video(self, request): response = StreamResponse() response.content_type = 'multipart/x-mixed-replace; boundary=frame' await response.prepare(request) cap = cv2.VideoCapture(0) if not cap.isOpened(): raise RuntimeError("無法啟動攝像頭。") try: while True: ret, frame = cap.read() if not ret: break _, buffer = cv2.imencode('.jpg', frame) jpg_as_text = buffer.tobytes() data = (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpg_as_text + b'\r\n') await response.write(data) await asyncio.sleep(0.05) # 控制幀率 finally: cap.release() return response ``` #### 服務部署與執行 - 啟動伺服器並設定路由,提供命令處理與視頻流傳輸的功能。 ```python async def init_app(): app = web.Application() controller = CarController() app.add_routes([web.post('/move', controller.handle_move)]) app.add_routes([web.get('/video_feed', controller.stream_video)]) return app async def main(): app = await init_app() await run_server(app) ``` 透過上述設定,系統能夠實現對 Arduino 控制車輛的遠程控制及視頻監控,適合應用於遠程監控、教育或娛樂等領域。 #### 原代碼 ```python import asyncio from aiohttp import web from aiohttp.web_response import StreamResponse import serial import sys import cv2 # 確保已安裝OpenCV庫 class CarController: def __init__(self): self.serial_port = serial.Serial("COM4", 9600, timeout=1) def process_command(self, command): print(f"處理 Arduino 命令: {command}") if self.serial_port.is_open: try: self.serial_port.write((command + '\n').encode()) print(f"命令發送到 Arduino: {command}") except serial.SerialException as e: print(f"向 Arduino 發送命令失敗: {e}") else: print("串行端口未打開。") def update_console(self, message): sys.stdout.write(f"\r{message}{' ' * 20}\n") sys.stdout.flush() async def handle_move(self, request): data = await request.json() command = data.get('command') if command: self.update_console(f"命令 '{command}' 已處理。 ") return web.Response(text=f"命令 '{command}' 已處理。") return web.Response(text="請求中沒有包含有效的命令。") async def stream_video(self, request): response = StreamResponse() response.content_type = 'multipart/x-mixed-replace; boundary=frame' await response.prepare(request) cap = cv2.VideoCapture(0) if not cap.isOpened(): raise RuntimeError("無法啟動攝像頭。") try: while True: ret, frame = cap.read() if not ret: break _, buffer = cv2.imencode('.jpg', frame) jpg_as_text = buffer.tobytes() data = (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpg_as_text + b'\r\n') await response.write(data) await asyncio.sleep(0.05) # 控制幀率 except Exception as e: print(e) finally: cap.release() return response async def init_app(): app = web.Application() controller = CarController() app.add_routes([web.post('/move', controller.handle_move)]) app.add_routes([web.get('/video_feed', controller.stream_video)]) return app async def run_server(app, host="0.0.0.0", port=7024): runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() print(f"伺服器運行在 http://{host}:{port}") await asyncio.Event().wait() async def main(): app = await init_app() await run_server(app) if __name__ == "__main__": asyncio.run(main()) ```