# [python - speed up] How to speed up multiple camera streams simultaneously with multithreading<br>如何使用多執行序加速視訊(相機/錄影)串流 ###### tags: `stream` `multiple camera streams` `thread` `multithread` `線程` `串流` --- ### [python - speed up] - [[python - speed up]利用Concurrency加速你的 Python 程式執行效率](https://hackmd.io/@YungHuiHsu/SJ5EgB5eT) - [[python - speed up] process(程序/進程) 與 thread (執行緒/線程)](https://hackmd.io/@YungHuiHsu/SJCIdO5x6) - [[python - speed up] 多執行緒(multi-threading)的基本概念](https://hackmd.io/@YungHuiHsu/r1C9Akpgp) - [python - speed up] threading 模組的使用(待補) - [[python - speed up] How to speed up multiple camera streams simultaneously with multithreading<br>如何使用多執行序加速視訊(相機/錄影)串流](https://hackmd.io/@YungHuiHsu/BJr5SyYQn) - [python - speed up] multiprocessing模組的各種使用方法(待補) - [[python - speed up] multiprocessing模組的Pool功能使用範例。multiprocessing.Pool Sample Notes](https://hackmd.io/@YungHuiHsu/BJiMZLKxp) --- 以YOLOv7 程式碼為例 - [`yolov7/utils/datasets.py `](https://github.com/WongKinYiu/yolov7/blob/main/utils/datasets.py) ```python=266 class LoadStreams: # multiple IP or RTSP cameras def __init__(self, sources='streams.txt', img_size=640, stride=32): self.mode = 'stream' self.img_size = img_size self.stride = stride if os.path.isfile(sources): with open(sources, 'r') as f: sources = [x.strip() for x in f.read().strip().splitlines() if len(x.strip())] else: sources = [sources] n = len(sources) self.imgs = [None] * n self.sources = [clean_str(x) for x in sources] # clean source names for later for i, s in enumerate(sources): # Start the thread to read frames from the video stream print(f'{i + 1}/{n}: {s}... ', end='') url = eval(s) if s.isnumeric() else s if 'youtube.com/' in str(url) or 'youtu.be/' in str(url): # if source is YouTube video check_requirements(('pafy', 'youtube_dl')) import pafy url = pafy.new(url).getbest(preftype="mp4").url cap = cv2.VideoCapture(url) assert cap.isOpened(), f'Failed to open {s}' w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.fps = cap.get(cv2.CAP_PROP_FPS) % 100 _, self.imgs[i] = cap.read() # guarantee first frame thread = Thread(target=self.update, args=([i, cap]), daemon=True) print(f' success ({w}x{h} at {self.fps:.2f} FPS).') thread.start() print('') # newline # check for common shapes s = np.stack([letterbox(x, self.img_size, stride=self.stride)[0].shape for x in self.imgs], 0) # shapes self.rect = np.unique(s, axis=0).shape[0] == 1 # rect inference if all shapes equal if not self.rect: print('WARNING: Different stream shapes detected. For optimal performance supply similarly-shaped streams.') def update(self, index, cap): # Read next stream frame in a daemon thread n = 0 while cap.isOpened(): n += 1 # _, self.imgs[index] = cap.read() cap.grab() if n == 4: # read every 4th frame success, im = cap.retrieve() self.imgs[index] = im if success else self.imgs[index] * 0 n = 0 time.sleep(1 / self.fps) # wait time def __iter__(self): self.count = -1 return self def __next__(self): self.count += 1 img0 = self.imgs.copy() if cv2.waitKey(1) == ord('q'): # q to quit cv2.destroyAllWindows() raise StopIteration # Letterbox img = [letterbox(x, self.img_size, auto=self.rect, stride=self.stride)[0] for x in img0] # Stack img = np.stack(img, 0) # Convert img = img[:, :, :, ::-1].transpose(0, 3, 1, 2) # BGR to RGB, to bsx3x416x416 img = np.ascontiguousarray(img) return self.sources, img, img0, None def __len__(self): return 0 # 1E12 frames = 32 streams at 30 FPS for 30 years ``` LoadStreams 這個類別可以在程式中使用,用於處理多個 IP 或 RTSP攝影機串流。這個類別具有以下方法和屬性: * `__init__(self, sources='streams.txt', img_size=640, stride=32)`:透過從檔案或列表讀取攝影機來源,建立所需的視訊捕獲物件並為每個攝影機串流啟動一個守護線程,以讀取幀。 * `update(self, index, cap)`:一個方法,為每個攝影機串流運行單獨的線程。它從視訊串流中讀取幀並更新 imgs 列表中對應的索引。 * `__iter__(self)`:初始化迭代計數器並返回對象本身。 * `__next__(self)`:從攝影機串流返回下一批圖像。它對圖像進行letterboxing以維持其宽高比,將圖像堆疊並將其從 BGR 格式轉換為 RGB 格式。如果按下 'q' 鍵,它將關閉所有顯示視窗並引發 StopIteration 錯誤。 * `__len__(self)`:返回串流中的總幀數。在此實現中,它返回一個大數字(1E12)以表示無限數量的幀。 以下詳細解釋class內的各方法 ============================================= ### `__init__()`:透過從檔案或列表讀取攝影機來源,建立所需的視訊捕獲物件並為每個攝影機串流啟動一個守護線程,以讀取幀 - `arguments` - 這個類別的目的是處理多個輸入視訊串流。該方法接受三個參數 `sources、img_size 和 stride`。 - `sources `可以是包含視訊列表的文本文件的文件路徑或單個視訊串流 - `img_size` 是神經網絡所需的輸入圖像大小, - `stride` 是 `letterbox()` 函数調整幀至所需輸入大小的步幅值。 - `source`讀入來源處理 - 如果 `sources` 是文件路徑,該方法將讀取文件並將其內容拆分為列表,並通過刪除任何空白字符來清理該列表。 - 否則,`sources` 將直接添加到列表中。該方法然後初始化變量和每個視訊串流的空列表。 - `thread`線程使用 - 對於每個視訊串流,該方法啟動一個線程來從視訊串流中讀取幀。 - 它打印一個狀態消息,指示正在處理的視訊串流數量以及該串流是否成功打開。 - 如果視訊串流是 YouTube 視訊,該方法導入 `pafy` 和 `youtube_dl` 包,並使用 `pafy.new(url).getbest(preftype="mp4").url` 獲取該串流的最佳質量視訊 url。 - 該方法然後初始化一個 `cv2.VideoCapture` 對象,並驗證它已經成功打開。它從視訊串流中獲取幀尺寸和幀速率信息,並通過調用 `cap.read()` 一次(使用 `_` 忽略boolean返回值)來確保已讀取第一幀。 - 該方法使用 `Thread` 方法為每個視訊串流啟動一個新線程,並使用 `target=self.update` 和 `args=([i, cap])` 參數。這將創建一個新的線程,在後台運行帶有參數 `i` 和 `cap` 的 `update` 方法。`daemon=True` 參數將設置線程在後台運行,即使線程仍在運行,也允許主程序退出。 - 串流資料形狀檢查 - 一旦所有線程都啟動了,該方法通過調用 `np.stack([letterbox(x, self.img_size, stride=self.stride)[0].shape for x in self.imgs], 0)` 將每個串流的 `letterboxed` 形狀堆疊到 numpy 數組中,檢查所有視訊串流是否具有相同的形狀。如果形狀相同,則將 `self.rect` 屬性設置為`True`。 如果不是,就會列印出一條警告資訊,表明檢測到了不同的串流形狀,只有在提供類似形狀的串流時才能實現最佳性能。 ### `__update__()`: - 在 `update` 方法中,它在由 `init` 啟動的每個攝影機串流的單獨線程中運行,以 `1/fps` 的速率從串流中讀取幀,其中 `fps` 是影片串流的幀速率值。 - 它使用 `cap.grab()` 以高效地抓取當前幀而不對其進行解碼。 - 如果 `n == 4` ,則程式碼使用 `cap.retrieve()` 檢索幀,以解碼並返回抓取的幀。 - 如果檢索成功,則將檢索到的幀賦值給 `self.imgs[index] `,否則將其設置為 `self.imgs[index] * 0` 。然後,程式碼在重複此過程之前等待所需的時間。 - 總的來說,這段程式碼片段負責從多個攝影機源的視訊串流中讀取和更新幀,確保所有串流具有相同的形狀以實現最佳性能,並為每個攝影機單獨啟動線程以不阻檔主程式 ### 其他 - `iter()` 方法 - 將變數 `self.count` 初始化為 -1,然後返回 `self`。此方法使該物件可迭代。 - `next()` 方法功能為迭代物件時 - 它將 `self.count` 變數增加 1,創建幀的副本 - 並使用 `cv2.waitKey(1) == ord('q')` 檢查用戶是否按下“q”鍵退出。 - 如果按下“q”鍵,該方法將銷毀所有窗口並引發 `StopIteration` 异常以停止迭代。 - 如果未按下“q”鍵,則依序執行以下步驟 - 該方法將實現 `letterbox()` 以將幀調整為所需的輸入尺寸 - 將幀堆疊成 numpy 數組 - 並將幀從 BGR 格式轉換為 RGB 格式 - 然後,該方法返回一個元組,其中包含: - 原始資料來源(str)、處理過的圖像、原始圖像以及 None。 - `len()` 方法 - 返回值為 0,這表示要處理的幀數已設置為 1E12,相當於 32 個流以每秒 30 幀的速度運行 30 年。這基本上將設置要處理的幀數為無限。