# Python 爬蟲 - 爬取雲播影片 [TOC] ## 思路 1. 尋找關於影片的資訊。 > * 從頁面源代碼看看有沒有 `<video> 標籤`。 > * 沒有的話再找找看 `.m3u8 的 url`。 > * 還是沒有的話,可以從 `F12` 再點擊影片尋找 >![](https://i.imgur.com/koypE4s.png) >進入鏈結後查看可以找到我們需要的 `.m3u8文件` 以及 `影片加密的key`。 ![](https://i.imgur.com/q8O1GP1.png) 2. 找到了影片資訊後開始觀察。 > * 首先點開第一個 `index.m3u8`。 >> 發現裡面存放的並不是平常的 `.m3u8文件`,而是又指向了另一個 `.m3u8文件`。 >> > ![](https://i.imgur.com/WxUGyot.png) > * 所以先去觀察第二個 `index.m3u8`。 >> 透過觀察可以發現第一個 `index.m3u8` 給的 `url` 拼接上 >> 域名 : **`https://video.buycar5.cn`**,就會是第二個 `index.m3u8` 的位置, >> 也就是**影片真正存放的位置**。 >> > ![](https://i.imgur.com/TRkAE4G.png) > ![](https://i.imgur.com/gRNFmwr.png) 3. 觀察最後一個 `key.key`。 >> 可以透過第二個 `index.m3u8` 裡面找到 `key.key` 的位置,進而提取到 `key 值`。 >> > ![](https://i.imgur.com/dDaL4HV.png) > ![](https://i.imgur.com/XyE7m3h.png) ## 實作流程 1. 從頁面原代碼中先提取到第一層 `.m3u8` 的 `url`。 2. 下載第一層的 `.m3u8文件` ( 裡面有第二層的 `url` )。 提取到 `url` 之後,再下載第二層的 `.m3u8文件` ( 影片真正存放的位置 )。 3. 透過第二層 `.m3u8文件`下載 `.ts影片`。同時記錄影片順序,方便之後的合併。 4. 下載密鑰,對影片進行解密。 5. 將 `.ts檔` 合併成 `.mp4檔`。 ## 實作 1. 簡單的資料提取。 ```python=31 def get_m3u8_url(url) : resp = requests.get(url) obj = re.compile(r'"link_pre".*?"url":"(?P<m3u8_url>.*?)",', re.S) m3u8_url = obj.search(resp.text).group("m3u8_url") # https:\/\/video.buycar5.cn\/20200901\/e4NhpyM5\/index.m3u8 (讀取到的) # https://video.buycar5.cn/20200901/e4NhpyM5/index.m3u8 (處理過的) m3u8_url = m3u8_url.replace('\\', '') # 將 \ 處理掉 resp.close() return m3u8_url ``` 2. 下載 `.m3u8文件` 並寫入檔案。 ```python=22 def download_m3u8(url, name) : resp = requests.get(url) with open(f"{name}.m3u8", mode="wb") as f : f.write(resp.content) resp.close() ``` 3. 將第二層 `.m3u8文件` 裡的影片網址過濾出來,再**使用協程**進行下載。 :::warning 這部分的 `session` 如果是放到 `download_videos(session, url, name)` 的話需要不斷建立,下載一段影片就建立一次,相當耗時。 所以直接放到 `get_videos_url(videos)`,再當作參數傳遞就可以了。 ::: :::danger 記得 **`aiohttp`** 要寫入內容時,不能只寫 `resp.content`,後面還必須加上 `.read()`。 ```python=48 await f.write(await resp.content.read()) ``` ::: ```python=45 async def download_videos(session, url, name) : async with session.get(url) as resp : async with aiofiles.open(f"./影片/{name}", mode="wb") as f : await f.write(await resp.content.read()) print(f"{name}下載完畢!!") async def get_videos_url(videos) : tasks = [] # session 如果丟到 download_videos 裡的話需要不斷建立,所以放在這裡 async with aiohttp.ClientSession() as session : async with aiofiles.open("second.m3u8", mode="r") as f : async for line in f : if(line[0] == '#') : continue line = line.strip() name = line.rsplit('/', 1)[1] videos.append(name) task = asyncio.create_task(download_videos(session, line, name)) tasks.append(task) await asyncio.wait(tasks) ``` 4. 先下載密鑰,再**使用協程**對影片進行解密。 >* 如果 `.m3u8文件` 是進行 `AES加密`,但是並沒有給予偏移量, >默認 **`IV = b"0000000000000000"`**。 >* 模式的部分可能就需要自己嘗試了。 :::warning 同時開啟兩個檔案,一個讀取加密的影片,一個將解密完的影片寫入。 ```python=79 async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\ aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 : ``` ::: ```python=69 def get_key(url) : resp = requests.get(url) resp.close() return resp.text async def video_decode(name, key) : aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC) # 一次開起兩個檔案 async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\ aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 : data = await f1.read() await f2.write(aes.decrypt(data)) print(f"{name}解密完成") async def aio_decode(name, key) : tasks = [] for i in name : task = asyncio.create_task(video_decode(i, key)) tasks.append(task) await asyncio.wait(tasks) ``` 5. 使用 **`os 模組`** 進行影片合併 > 合併方式 : > * **windows** : `copy /b xx1.ts+xx2.ts movie.mp4` > * **mac** : `cat xx1.ts xx2.ts > movie.mp4` :::warning 因為檔案過多可能出現命令過長的情況,所以先將影片分段合併,最後再將全部合併成一個。 ::: ```python=96 def merge_video(names) : # 因為命令長度過長,所以分段合併.ts影片 each_video = 300 # 一次合併300個.ts影片 part = 1 for i in range(0, len(names), each_video) : lst = [] for name in names[i : i+each_video] : lst.append(f".\解密\de_{name}") cmd = "+".join(lst) os.system(f"copy /b {cmd} 越獄{part}.mp4") print(f"part{part} 合併完成") part += 1 # 將剛剛的.mp4合併成一個影片 lst = [] for i in range(1,part) : lst.append(f"越獄{i}.mp4") cmd = "+".join(lst) os.system(f"copy /b {cmd} 越獄.mp4") print("合併完成") ``` ## 程式碼 ```python= ''' 流程 : 1. 從頁面源代碼中找到 m3u8的文件 2. 下載第一層m3u8文件(指到第二層位置) -> 下載第二層m3u8文件(真正影片存放路徑) 3. 下載影片,紀錄影片順序 4. 下載密鑰,進行解密 5. 合併所有 ts文件成一個 mp4文件 ''' import requests import asyncio import aiohttp import aiofiles from Crypto.Cipher import AES import re import os import nest_asyncio nest_asyncio.apply() def download_m3u8(url, name) : resp = requests.get(url) with open(f"{name}.m3u8", mode="wb") as f : f.write(resp.content) resp.close() def get_m3u8_url(url) : resp = requests.get(url) obj = re.compile(r'"link_pre".*?"url":"(?P<m3u8_url>.*?)",', re.S) m3u8_url = obj.search(resp.text).group("m3u8_url") # https:\/\/video.buycar5.cn\/20200901\/e4NhpyM5\/index.m3u8 (讀取到的) # https://video.buycar5.cn/20200901/e4NhpyM5/index.m3u8 (處理過的) m3u8_url = m3u8_url.replace('\\', '') # 將 \ 處理掉 resp.close() return m3u8_url async def download_videos(session, url, name) : async with session.get(url) as resp : async with aiofiles.open(f"./影片/{name}", mode="wb") as f : await f.write(await resp.content.read()) print(f"{name}下載完畢!!") async def get_videos_url(videos) : tasks = [] # session 如果丟到 download_videos 裡的話需要開關太多次,所以放在這裡 async with aiohttp.ClientSession() as session : async with aiofiles.open("second.m3u8", mode="r") as f : async for line in f : if(line[0] == '#') : continue line = line.strip() name = line.rsplit('/', 1)[1] videos.append(name) task = asyncio.create_task(download_videos(session, line, name)) tasks.append(task) await asyncio.wait(tasks) def get_key(url) : resp = requests.get(url) resp.close() return resp.text async def video_decode(name, key) : aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC) # 一次開起兩個檔案 async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\ aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 : data = await f1.read() await f2.write(aes.decrypt(data)) print(f"{name}解密完成") async def aio_decode(name, key) : tasks = [] for i in name : task = asyncio.create_task(video_decode(i, key)) tasks.append(task) await asyncio.wait(tasks) def merge_video(names) : # 因為命令長度過長,所以分段合併.ts影片 each_video = 300 # 一次合併300個.ts影片 part = 1 for i in range(0, len(names), each_video) : lst = [] for name in names[i : i+each_video] : lst.append(f".\解密\de_{name}") cmd = "+".join(lst) os.system(f"copy /b {cmd} 越獄{part}.mp4") print(f"part{part} 合併完成") part += 1 # 將剛剛的.mp4合併成一個影片 lst = [] for i in range(1,part) : lst.append(f"越獄{i}.mp4") cmd = "+".join(lst) os.system(f"copy /b {cmd} 越獄.mp4") print("合併完成") def main(url) : # 1. 從頁面源代碼中找到 m3u8的文件 first_m3u8_url = get_m3u8_url(url) print("1. finish") # 2. 下載第一層m3u8文件(指到第二層位置) download_m3u8(first_m3u8_url, "first") # 2.1 取得第二層 m3u8_url with open("first.m3u8", mode="r") as f: for line in f : if(line[0] == '#') : continue # /20200901/e4NhpyM5/1000kb/hls/index.m3u8 (從第一層獲得的url) # https://video.buycar5.cn/20200901/e4NhpyM5/1000kb/hls/index.m3u8 (目標拼接成著樣子) line = line.strip() second_m3u8_url = "https://video.buycar5.cn" + line # 2.2 下載第二層m3u8文件(真正影片存放路徑) download_m3u8(second_m3u8_url, "second") print("2. finish") # 3. 下載影片,紀錄影片順序 ## 使用協程 videos = [] asyncio.run(get_videos_url(videos)) print("3. finish") # 4. 下載密鑰,進行解密 # 4.1 拿到密鑰 # 偷懶,應該要自己去提取 key_url = "https://ts1.yuyuangewh.com:9999/20200901/e4NhpyM5/1000kb/hls/key.key" key = get_key(key_url) # 4.2 解密 asyncio.run(aio_decode(videos, key)) print("4. finish") # # 測試用 # videos = [] # with open("second.m3u8", mode="r") as f : # for line in f : # if(line[0] == '#') : # continue # line = line.strip() # name = line.rsplit('/', 1)[1] # videos.append(name) # 5. 合併所有 ts文件成一個 mp4文件 merge_video(videos) print("5. finish") if __name__ == '__main__' : url = "https://www.yunbtv.com/vodplay/yueyudiyiji-1-1.html" main(url) ```