# Python 爬蟲 - 爬取雲播影片
[TOC]
## 思路
1. 尋找關於影片的資訊。
> * 從頁面源代碼看看有沒有 `<video> 標籤`。
> * 沒有的話再找找看 `.m3u8 的 url`。
> * 還是沒有的話,可以從 `F12` 再點擊影片尋找
>![](https://i.imgur.com/koypE4s.png)
>進入鏈結後查看可以找到我們需要的 `.m3u8文件` 以及 `影片加密的key`。
![](https://i.imgur.com/q8O1GP1.png)
2. 找到了影片資訊後開始觀察。
> * 首先點開第一個 `index.m3u8`。
>> 發現裡面存放的並不是平常的 `.m3u8文件`,而是又指向了另一個 `.m3u8文件`。
>>
> ![](https://i.imgur.com/WxUGyot.png)
> * 所以先去觀察第二個 `index.m3u8`。
>> 透過觀察可以發現第一個 `index.m3u8` 給的 `url` 拼接上
>> 域名 : **`https://video.buycar5.cn`**,就會是第二個 `index.m3u8` 的位置,
>> 也就是**影片真正存放的位置**。
>>
> ![](https://i.imgur.com/TRkAE4G.png)
> ![](https://i.imgur.com/gRNFmwr.png)
3. 觀察最後一個 `key.key`。
>> 可以透過第二個 `index.m3u8` 裡面找到 `key.key` 的位置,進而提取到 `key 值`。
>>
> ![](https://i.imgur.com/dDaL4HV.png)
> ![](https://i.imgur.com/XyE7m3h.png)
## 實作流程
1. 從頁面原代碼中先提取到第一層 `.m3u8` 的 `url`。
2. 下載第一層的 `.m3u8文件` ( 裡面有第二層的 `url` )。
提取到 `url` 之後,再下載第二層的 `.m3u8文件` ( 影片真正存放的位置 )。
3. 透過第二層 `.m3u8文件`下載 `.ts影片`。同時記錄影片順序,方便之後的合併。
4. 下載密鑰,對影片進行解密。
5. 將 `.ts檔` 合併成 `.mp4檔`。
## 實作
1. 簡單的資料提取。
```python=31
def get_m3u8_url(url) :
resp = requests.get(url)
obj = re.compile(r'"link_pre".*?"url":"(?P<m3u8_url>.*?)",', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
# https:\/\/video.buycar5.cn\/20200901\/e4NhpyM5\/index.m3u8 (讀取到的)
# https://video.buycar5.cn/20200901/e4NhpyM5/index.m3u8 (處理過的)
m3u8_url = m3u8_url.replace('\\', '') # 將 \ 處理掉
resp.close()
return m3u8_url
```
2. 下載 `.m3u8文件` 並寫入檔案。
```python=22
def download_m3u8(url, name) :
resp = requests.get(url)
with open(f"{name}.m3u8", mode="wb") as f :
f.write(resp.content)
resp.close()
```
3. 將第二層 `.m3u8文件` 裡的影片網址過濾出來,再**使用協程**進行下載。
:::warning
這部分的 `session` 如果是放到 `download_videos(session, url, name)` 的話需要不斷建立,下載一段影片就建立一次,相當耗時。
所以直接放到 `get_videos_url(videos)`,再當作參數傳遞就可以了。
:::
:::danger
記得 **`aiohttp`** 要寫入內容時,不能只寫 `resp.content`,後面還必須加上 `.read()`。
```python=48
await f.write(await resp.content.read())
```
:::
```python=45
async def download_videos(session, url, name) :
async with session.get(url) as resp :
async with aiofiles.open(f"./影片/{name}", mode="wb") as f :
await f.write(await resp.content.read())
print(f"{name}下載完畢!!")
async def get_videos_url(videos) :
tasks = []
# session 如果丟到 download_videos 裡的話需要不斷建立,所以放在這裡
async with aiohttp.ClientSession() as session :
async with aiofiles.open("second.m3u8", mode="r") as f :
async for line in f :
if(line[0] == '#') :
continue
line = line.strip()
name = line.rsplit('/', 1)[1]
videos.append(name)
task = asyncio.create_task(download_videos(session, line, name))
tasks.append(task)
await asyncio.wait(tasks)
```
4. 先下載密鑰,再**使用協程**對影片進行解密。
>* 如果 `.m3u8文件` 是進行 `AES加密`,但是並沒有給予偏移量,
>默認 **`IV = b"0000000000000000"`**。
>* 模式的部分可能就需要自己嘗試了。
:::warning
同時開啟兩個檔案,一個讀取加密的影片,一個將解密完的影片寫入。
```python=79
async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\
aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 :
```
:::
```python=69
def get_key(url) :
resp = requests.get(url)
resp.close()
return resp.text
async def video_decode(name, key) :
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
# 一次開起兩個檔案
async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\
aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 :
data = await f1.read()
await f2.write(aes.decrypt(data))
print(f"{name}解密完成")
async def aio_decode(name, key) :
tasks = []
for i in name :
task = asyncio.create_task(video_decode(i, key))
tasks.append(task)
await asyncio.wait(tasks)
```
5. 使用 **`os 模組`** 進行影片合併
> 合併方式 :
> * **windows** : `copy /b xx1.ts+xx2.ts movie.mp4`
> * **mac** : `cat xx1.ts xx2.ts > movie.mp4`
:::warning
因為檔案過多可能出現命令過長的情況,所以先將影片分段合併,最後再將全部合併成一個。
:::
```python=96
def merge_video(names) :
# 因為命令長度過長,所以分段合併.ts影片
each_video = 300 # 一次合併300個.ts影片
part = 1
for i in range(0, len(names), each_video) :
lst = []
for name in names[i : i+each_video] :
lst.append(f".\解密\de_{name}")
cmd = "+".join(lst)
os.system(f"copy /b {cmd} 越獄{part}.mp4")
print(f"part{part} 合併完成")
part += 1
# 將剛剛的.mp4合併成一個影片
lst = []
for i in range(1,part) :
lst.append(f"越獄{i}.mp4")
cmd = "+".join(lst)
os.system(f"copy /b {cmd} 越獄.mp4")
print("合併完成")
```
## 程式碼
```python=
'''
流程 :
1. 從頁面源代碼中找到 m3u8的文件
2. 下載第一層m3u8文件(指到第二層位置) -> 下載第二層m3u8文件(真正影片存放路徑)
3. 下載影片,紀錄影片順序
4. 下載密鑰,進行解密
5. 合併所有 ts文件成一個 mp4文件
'''
import requests
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
import re
import os
import nest_asyncio
nest_asyncio.apply()
def download_m3u8(url, name) :
resp = requests.get(url)
with open(f"{name}.m3u8", mode="wb") as f :
f.write(resp.content)
resp.close()
def get_m3u8_url(url) :
resp = requests.get(url)
obj = re.compile(r'"link_pre".*?"url":"(?P<m3u8_url>.*?)",', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
# https:\/\/video.buycar5.cn\/20200901\/e4NhpyM5\/index.m3u8 (讀取到的)
# https://video.buycar5.cn/20200901/e4NhpyM5/index.m3u8 (處理過的)
m3u8_url = m3u8_url.replace('\\', '') # 將 \ 處理掉
resp.close()
return m3u8_url
async def download_videos(session, url, name) :
async with session.get(url) as resp :
async with aiofiles.open(f"./影片/{name}", mode="wb") as f :
await f.write(await resp.content.read())
print(f"{name}下載完畢!!")
async def get_videos_url(videos) :
tasks = []
# session 如果丟到 download_videos 裡的話需要開關太多次,所以放在這裡
async with aiohttp.ClientSession() as session :
async with aiofiles.open("second.m3u8", mode="r") as f :
async for line in f :
if(line[0] == '#') :
continue
line = line.strip()
name = line.rsplit('/', 1)[1]
videos.append(name)
task = asyncio.create_task(download_videos(session, line, name))
tasks.append(task)
await asyncio.wait(tasks)
def get_key(url) :
resp = requests.get(url)
resp.close()
return resp.text
async def video_decode(name, key) :
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
# 一次開起兩個檔案
async with aiofiles.open(f"./影片/{name}", mode="rb") as f1,\
aiofiles.open(f"./解密/de_{name}", mode="wb") as f2 :
data = await f1.read()
await f2.write(aes.decrypt(data))
print(f"{name}解密完成")
async def aio_decode(name, key) :
tasks = []
for i in name :
task = asyncio.create_task(video_decode(i, key))
tasks.append(task)
await asyncio.wait(tasks)
def merge_video(names) :
# 因為命令長度過長,所以分段合併.ts影片
each_video = 300 # 一次合併300個.ts影片
part = 1
for i in range(0, len(names), each_video) :
lst = []
for name in names[i : i+each_video] :
lst.append(f".\解密\de_{name}")
cmd = "+".join(lst)
os.system(f"copy /b {cmd} 越獄{part}.mp4")
print(f"part{part} 合併完成")
part += 1
# 將剛剛的.mp4合併成一個影片
lst = []
for i in range(1,part) :
lst.append(f"越獄{i}.mp4")
cmd = "+".join(lst)
os.system(f"copy /b {cmd} 越獄.mp4")
print("合併完成")
def main(url) :
# 1. 從頁面源代碼中找到 m3u8的文件
first_m3u8_url = get_m3u8_url(url)
print("1. finish")
# 2. 下載第一層m3u8文件(指到第二層位置)
download_m3u8(first_m3u8_url, "first")
# 2.1 取得第二層 m3u8_url
with open("first.m3u8", mode="r") as f:
for line in f :
if(line[0] == '#') :
continue
# /20200901/e4NhpyM5/1000kb/hls/index.m3u8 (從第一層獲得的url)
# https://video.buycar5.cn/20200901/e4NhpyM5/1000kb/hls/index.m3u8 (目標拼接成著樣子)
line = line.strip()
second_m3u8_url = "https://video.buycar5.cn" + line
# 2.2 下載第二層m3u8文件(真正影片存放路徑)
download_m3u8(second_m3u8_url, "second")
print("2. finish")
# 3. 下載影片,紀錄影片順序
## 使用協程
videos = []
asyncio.run(get_videos_url(videos))
print("3. finish")
# 4. 下載密鑰,進行解密
# 4.1 拿到密鑰
# 偷懶,應該要自己去提取
key_url = "https://ts1.yuyuangewh.com:9999/20200901/e4NhpyM5/1000kb/hls/key.key"
key = get_key(key_url)
# 4.2 解密
asyncio.run(aio_decode(videos, key))
print("4. finish")
# # 測試用
# videos = []
# with open("second.m3u8", mode="r") as f :
# for line in f :
# if(line[0] == '#') :
# continue
# line = line.strip()
# name = line.rsplit('/', 1)[1]
# videos.append(name)
# 5. 合併所有 ts文件成一個 mp4文件
merge_video(videos)
print("5. finish")
if __name__ == '__main__' :
url = "https://www.yunbtv.com/vodplay/yueyudiyiji-1-1.html"
main(url)
```