# Python 爬蟲筆記 [TOC] ## requests ``` pip install requests ``` 1. 最簡單分為 **`.get()`**、**`.post()`** 兩種模式。 2. **`.session()`** 是用來處理 `Cookie` 的模式。 3. 記得關閉 **`resp`**,否則訪問次數過多,可能將對方服務器塞爆,導致回傳錯誤。 ### requests 語法 #### requests.get() ```python= import requests url = "" # 目標網址 resp = requests.get(url) # 發送請求 print(resp.text) # .text 以文本顯示頁面原代碼 ``` :::warning 1. 若網站有反爬 ---> 想辦法偽裝的更像瀏覽器 * 加上 **User-Agent** ( 描述當前的輸出請求是由哪個設備發出的 ) ---> 可按 `F12` 查看 ```python= header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36" } resp = requests(url, headers=header) # 處理一個小小的反爬 resp.close() ``` ::: :::warning 2. 某些網站可能有特殊驗證方式,而 .get() 也會丟入一次安全驗證,導致請求錯誤。 ```python= resp = requests.get(url, verify=False) # verify=False 去掉安全驗證 ``` ::: :::warning 3. 出現亂碼 ---> 從原代碼中尋找網頁是用何種方式編碼。 ```python= resp.encoding = '' # 解碼 ``` ::: #### requests.post() 1. 必須從網路封包尋找想要的資料。 > 方法 : `F12` -> `Network` -> 從 `Preview` 尋找想要的資料 -> **`複製該資料的 URL`** 。 > * 同時要注意 `Headers 內的 Form Data`。 :::warning ```python=19 print(resp.text) #出現亂碼 print(resp.json()) #將返回內容處理成 json() ``` 使用 **`.text`** 可能會出現亂碼的形式,可以使用 **`.json()`** 處理。 ::: ```python= ''' 練習 : Request.post() 目標 : 從百度翻譯爬取翻譯內容 ''' import requests url = "https://fanyi.baidu.com/sug" s = input("輸入想要翻譯的英文單詞 ") dat = { # Form Data "kw": s } #發送 post 請求,數據必須放在字典中,通過 data 進行傳遞 resp = requests.post(url, data = dat) resp.close() print(resp.text) #出現亂碼 print(resp.json()) #將返回內容處理成 json() ``` #### 下載內容 1. 想要下載目標 URL 的內容只需要在請求的最後加上 **`.content`** 就可以了。 ```python= with open("xxx.mp4", mode="wb") as f : f.write(requests.get(URL).content) ``` ## re ( Regular Expression 正則表達式 ) ### re 使用方式 * 常用元字符 : ```python= . 匹配 "換行符以外" 的任意字符 \w 匹配 "字母" "數字" "下底線" \s 匹配任意的 "空白符" ( 空格、換行 ... ) \d 匹配 "數字" # 反義 \W \S \D ## 常用於文字校驗 ^ 匹配 "字符串的開始" $ 匹配 "字符串的結尾" ''' e.g. 校驗電話號碼 ---> s = "0912345678" >> re = "^\d{10}$" # 確保前後都不會有多餘的字符。 ''' # a|b 匹配 "字符a" 或 "字符b" [...] 匹配字符組中的字符 [^...] 字符組 "以外" 的所有字符 ''' [a~zA~Z0~9] # 可匹配到所有字母與數字 ''' ``` * 量詞 : ```c++= * 重複 "零次" 或 "更多次" + 重複 "一次" 或 "更多次" ? 重複 "零次" 或 "一次" {n} 重複 "n次" {n,} 重複 "n次" 或 "更多次" {n,m} 重複 "n次到m次" ``` * 匹配 : ```python= .* 貪婪匹配 .*? 惰性匹配 # 盡可能少的匹配內容 ''' e.g. s = "一起玩遊戲,玩什麼遊戲,什麼遊戲都可以" #>> re = "一起.*遊戲" #>> 一起玩遊戲,玩什麼遊戲,什麼遊戲 #>> re = "一起.*?遊戲" #>> 一起玩遊戲 ''' ``` ### re 語法 * **`re.findall()`** : 匹配字符串中所有符合正則的內容,返回 list。 > 1. 不常用,效率不高。 ```python= import re s = "我的號碼是:12345, 你的號碼是:6789" lst = re.findall(r"\d+", s) print(lst) ``` ``` # output ['12345', '6789'] ``` * **`re.finditer()`** : 匹配字符串中所有符合正則的內容,返回 iterator。 > 1. 效率比 list 高,用的較多。 > 2. 想要拿到 iterator 的內容需要 **`.group()`**。 ```python= import re s = "我的號碼是:12345, 你的號碼是:6789" it = re.finditer(r"\d+", s) for i in it : print(i) print(i.group()) ``` ``` # output <re.Match object; span=(6, 11), match='12345'> 12345 <re.Match object; span=(19, 23), match='6789'> 6789 ``` * **`re.search()`** : 全文匹配,找到一個結果就返回。 > 1. 拿數據需要 **`.group()`** ```python= import re s = "我的號碼是:12345, 你的號碼是:6789" res = re.search(r"\d+", s) print(res.group()) ``` ``` # output 12345 ``` * **`re.match()`** : **"從頭"** 開始匹配。 > 1. 拿數據需要 **`.group()`** > 2. 用的不多。 ```python= import re s1 = "我的號碼是:12345, 你的號碼是:6789" s2 = "12345, 你的號碼是:6789" match1 = re.match(r"\d+", s1) # s1開頭不是數字,沒有找到符合的內容 print(match1.group()) match2 = re.match(r"\d+", s2) print(match2.group()) ``` ```python= # ouptut AttributeError: 'NoneType' object has no attribute 'group' 12345 ``` * **`re.compile()`** : 預加載正則表達式。 > ```python= import re s = "我的號碼是:12345, 你的號碼是:6789" obj = re.compile(r"\d+") # 預加載 lst2 = obj.findall(s) print(lst2) ``` ``` # ouptut ['12345', '6789'] ``` * **`(?P<name>正則)`** : 單獨從正則匹配進一步提取特定內容。 * **`re.S`** : 使 **`.`** 可以匹配 "換行符" ---> 防止匹配斷掉。 ```python= import re s = """ <div class='A'><span id='1'>哥哥</span></div> <div class='B'><span id='2'>姊姊</span></div> <div class='C'><span id='3'>弟弟</span></div> <div class='D'><span id='4'>妹妹</span></div> """ obj = re.compile(r"<div class='.*?><span id='(?P<id>.*?)'>(?P<name>.*?)</span></div>", re.S) result = obj.finditer(s) for i in result : print(i.group("id")) print(i.group("name")) ``` ``` # output 1 哥哥 2 姊姊 3 弟弟 4 妹妹 ``` ## BeautifulSoup ``` pip install bs4 ``` ### BeautifulSoup 語法 * **`.find("標籤", 屬性="值")`** : 找第一個匹配內容 * **`.find_all("標籤", 屬性="值")`** : 找所有匹配內容 > 1. 拿數據需要 **`.text`**。 > 2. **`.get('')`** 可以拿到特定的屬性值。 > 3. 可以使用 **`.find().find_all()`**,將範圍縮得更小。 > ```python= > lst = main_page.find("div", class_="TypeList").find_all("a")` >``` ### 程式碼 :::warning ```python=12 job_list = page.find("div", class_="jobs-wrap") job_list = page.find("div", attrs={"class": "jobs-wrap"}) ``` 兩行是相同意思。 因為 `class` 是 python 的關鍵字,所以改為 **`class_`**。 也可以使用 **`attrs{}`** 將參數放在裡面。 ::: ```python= ''' 目標 : 從 "小雞上工" 爬取任務名稱 ''' import requests from bs4 import BeautifulSoup url = "https://www.chickpt.com.tw/cases" resp = requests.get(url) resp.close() # 先把頁面原代碼處理成 bs 的對象 page = BeautifulSoup(resp.text, "html.parser") # 指定解析器,否則會出現 Warning # 查找數據 #job_list = page.find("div", class_="jobs-wrap") # 兩個方式相同 job_list = page.find("div", attrs={"class": "jobs-wrap"}) # 從 job_list 中繼續匹配內容 job_name = job_list.find_all("h2", class_="job-info-title ellipsis-job-name ellipsis") for i in job_name : print(i.text.strip()) ``` ## xpath ``` pip install lxml ``` 1. 依靠**節點**關係尋找。 2. 可直接 **`F12`** 複製想要位置的 **`Copy XPath`**。 > * 可以直接由**左上箭頭**指定內容。 > >![](https://i.imgur.com/vypfj22.png) ### xpath 語法 * **`etree.xxx`** : 根據加載內容選擇 `( HTML, XML, parse(文件) ... )`。 ```python=22 tree = etree.XML(xml) ``` * **`"/text()"`** : 提取匹配的內容。 ```python=30 result= tree.xpath("/book/name/text()") # /text() 提取內容 ``` * **`"節點//"`** 代表節點之下的所有子孫。 **`"節點/*"`** ---> **`*`** 代表節點之下的任意節點 ( 通配符 )。 ```python=44 # 希望一行 xpath 就能提取 author/div/nick 以及 author/span/nick 內容 result= tree.xpath("/book/author/*/nick/text()") # * 代表 author 的任意節點 print(result) ``` ``` # output ['安靜', '大笨鐘'] ``` ```python=49 # 想要所有 nick 的內容 result= tree.xpath("/book//nick/text()") print(result) ``` ``` # output ['周董', '稻香', '七里香', '晴天', '安靜', '大笨鐘'] ``` * **`節點[#]`** : 可以單獨索引節點的內容。 :::warning xpath 順序是從 **`1`** 開始數的。 ::: ```python=31 # 不想全部都拿,只想要其中的一個 -> []索引 result = tree.xpath("/html/body/ul/li[1]/a/text()") print(result) ``` ``` # output ['百度'] ``` * **`節點[@屬性='']`** : 對屬性作篩選。 ```python=37 # 提取 herf = 'samsung' 的 a 標籤 -> [@屬性=''] result = tree.xpath("/html/body/ol/li/a[@href='samsung']/text()") print(result) ``` ``` # output ['三星'] ``` * **`./`** : 相對查找,從當前位置繼續搜尋。 * **`/@屬性`** : 提取屬性值。 ```python=43 # 遍歷 ul/li result = tree.xpath("/html/body/ul/li") for li in result : # 從每個 li 提取內容 result = li.xpath("./a/text()") # 從 li 中繼續尋找,相對查找 print(result) # 拿到 href 的屬性值 result = li.xpath("./a/@href") # 拿到屬性值 print(result) ``` ``` # output ['百度'] ['https://www.baidu.com/'] ['谷歌'] ['https://www.google.com.tw/'] ['雅虎'] ['https://tw.yahoo.com/'] ``` * **`標籤[position() > #]`** : 提取標籤位置大於 `#` 的內容。 > 有兩種寫法,因為 `xpath` 回傳的是 `list`,所以直接提取 `[1:]` 的部分也可以。 >```python= ># 假設只想要第一個`tr`標籤以後的數據 >result = xpath("./tr")[1:] >result = xpath("./tr[position()>1]") >``` --- ### 程式碼1 ```python= #1. etree. #2. /text() #3. // #4. /* from lxml import etree xml = """ <book> <name>周杰倫</name> <price>100</price> <nick>周董</nick> <author> <nick id="1">稻香</nick> <nick id="2">七里香</nick> <nick class="jay">晴天</nick> <div> <nick>安靜</nick> </div> <span> <nick>大笨鐘</nick> </span> </author> </book> """ #1. etree. -> 選擇檔案類型 tree = etree.XML(xml) result= tree.xpath("/book/name") print(result) #2. /text() -> 提取內容 result= tree.xpath("/book/name/text()") print("提取內容:\n", result) # 提取 author/nick result= tree.xpath("/book/author/nick/text()") print("提取 author/nick:\n", result) #3. // -> 如果連 "安靜"、"大笨鐘" 也想要一起 result= tree.xpath("/book/author//nick/text()") print('如果連 "安靜"、"大笨鐘" 也想要一起:\n', result) #4. /* -> 希望一行 xpath 就能提取 author/div/nick 以及 author/span/nick 內容 result= tree.xpath("/book/author/*/nick/text()") print("希望一行 xpath 就能提取 author/div/nick 以及 author/span/nick 內容:\n", result) # 想要所有 nick 的內容 result= tree.xpath("/book//nick/text()") print("想要所有 nick 的內容:\n", result) ``` ``` # output [<Element name at 0x162f5c47108>] 提取內容: ['周杰倫'] 提取 author/nick: ['稻香', '七里香', '晴天'] 如果連 "安靜"、"大笨鐘" 也想要一起: ['稻香', '七里香', '晴天', '安靜', '大笨鐘'] 希望一行 xpath 就能提取 author/div/nick 以及 author/span/nick 內容: ['安靜', '大笨鐘'] 想要所有 nick 的內容: ['周董', '稻香', '七里香', '晴天', '安靜', '大笨鐘'] ``` --- ### 程式碼2 ```python= #1. [#] #2. [@屬性=''] #3. ./ #4. /@屬性 from lxml import etree xml = """ <html> <head> <meta charset="UTF-8" /> <title>Title</title> </head> <body> <ul> <li><a href="https://www.baidu.com/">百度</a></li> <li><a href="https://www.google.com.tw/">谷歌</a></li> <li><a href="https://tw.yahoo.com/">雅虎</a></li> </ul> <ol> <li><a href="apple">蘋果</a></li> <li><a href="samsung">三星</a></li> </ol> </body> </html> """ tree = etree.XML(xml) # 拿到 <ul> 的內容 result = tree.xpath("/html/body/ul/li/a/text()") print(result) #1. []索引 -> 不想全部都拿,只想要其中的一個 result = tree.xpath("/html/body/ul/li[1]/a/text()") print(result) #2. [@屬性=''] -> 提取 herf = 'samsung' 的 a 標籤 result = tree.xpath("/html/body/ol/li/a[@href='samsung']/text()") print(result) # 遍歷 ul/li result = tree.xpath("/html/body/ul/li") for li in result : #3. ./ -> 從每個 li 提取內容 result = li.xpath("./a/text()") # 從 li 中繼續尋找,相對查找 print(result) #4. /@屬性 -> 拿到 href 的屬性值 result = li.xpath("./a/@href") # 拿到屬性值 print(result) ``` ## 防盜處理 ### 處理 Cookie 1. 一般與 **`登入`** 或 **`校驗`** 有關。 2. 有些網站資訊是需要 **`登入`** 之後才能顯示出來。 3. 瀏覽器是如何記錄登入過程 ? > (1) 用戶登入 ( 帶著用戶名、密碼 ) ---> 服務器回傳給客戶端 `Cookie`。 > (2) 再次發起請求,就可以帶著 `Cookie` 訪問服務器。 > ---> 此時服務器知道 `Cookie` 是誰,就會把相關內容回傳。 > ![](https://i.imgur.com/Mz5Cm36.png) --- #### 程式碼 :::warning 第一次使用 **`.session()`** 請求,得到 `cookie`。 如果要進一步請求獲得想要的內容,必須再一次使用 **`.session()`**,因為只有這個裡面才保有剛剛獲得的 `cookie`。 :warning: 注意不能使用 **`.requests()`**,因為在這裡面不含 `cookie`。 ```python=29 resp2 = session.get(URL) # F12 -> 找到想要的數據 -> 複製URL ``` ::: :::warning 如果真的想用 **`.requests()`** 的話 : ```python= # 兩個獲得的內容相同 resp = session.get(URL) resp = requests.get(URL, headers={"Cookie": cookie}) ``` ::: ```python= ''' 找不到實例,以下為大致框架。 ''' # 1. 登入 -> 得到cookie # 2. 帶著cookie去請求url -> 得到內容 # 將上面兩個操作連起來 # 使用 .session()進行請求 -> 這個過程cookie不會丟失 import requests session = requests.session() data = { # login內的 "Form Data" "loginName": "xxx", "password": "xxx" } # 1. 登入 url = ".../login" # 從登入畫面登入,F12 -> login -> 複製URL resp = session.post(url, data=data) print(resp.text) print(resp.cookies) # 看cookie # 2. 進一步拿數據 ## 只有剛剛的session中才有cookie ## 不能使用 requestes.get()。 resp2 = session.get(URL) # F12 -> 找到想要的數據 -> 複製URL print(resp2.text) ``` ### 處理防盜鏈 1. 有些網站為了反爬,會加上 **`"Referer"`** **溯源**。 > * 尋找方式 : **`F12 -> Network -> 資料 -> Headers -> Request Headers -> Referer`**。 > * 什麼是**溯源** ? >> 假設一般是由 A -> B -> C 依序訪問。 >> 但如果 B 莫名其妙在外面執行,溯源就沒辦法找到 A,而出現錯誤。 > * 所以可以把**溯源**當作是 **`請求的上一級`**。 :::warning :warning: **F12**看到的節點與返回的**頁面原代碼**很多時候是不匹配的,而**F12**看到的才會是我們看到的HTML表現。 > 如果確定了我們想要的訊息不存在頁面原代碼中,就表示它一定是透過**第二次請求**或是其他手段加載訊息。 ::: --- #### 程式碼 :::success **`videoStatus`** 可以由 **`F12 -> Network -> 選擇 XHR`** 方式尋找。 ![](https://i.imgur.com/8RM9QSf.png) ::: :::success * 影片連結兩者間只相差 **`cont-`** 的部分。 **`上`** : `F12箭頭`找到的真正鏈結。 **`下`** : 由 `videoStatus` 找到的鏈結。 > 如果可以想辦法替換就能獲得正確網址。 ![](https://i.imgur.com/SVgg7S3.png) ::: :::success 透過對比可以發現 : 1. **`cont-`** 後的數字可以透過原網址最後的數字獲得。 2. **`srcUrl`** 中間錯誤的數字串其實就是 **`systemTime`**。 ![](https://i.imgur.com/IRDVNrq.png) ::: :::warning 因為 **`resp.text`** 屬於 `string`,無法直接索引到屬性, 所以需使用 **`.json()`** 屬於 `dict`。 ```python=28 #dic = resp.text # string 無法使用['videoInfo']搜尋 dic = resp.json() ``` ::: ```python= ''' 目標 : 下載 "梨視頻" 裡的影片 ''' # 1. 拿到contID # 2. 拿到videoStatus返回的.json -> 取得srcURL # 3. 對srcURL的內容進行修整 # 4. 下載視頻 import requests # 1. 拿到 cont-ID url = "https://www.pearvideo.com/video_1737677" contID = url.split("_")[1] # 切割 url # 2. 拿到videoStatus返回的.json -> 取得srcURL videoStatusURL = f"https://www.pearvideo.com/videoStatus.jsp?contId={contID}&mrd=0.6865075064891424" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36", # 防盜鏈: 溯源 "Referer": url } resp = requests.get(videoStatusURL, headers=headers) # 3. 對srcURL的內容進行修整 #dic = resp.text # string 無法使用['videoInfo']搜尋 dic = resp.json() systemTime = dic['systemTime'] srcUrl = dic['videoInfo']['videos']['srcUrl'] srcUrl = srcUrl.replace(systemTime, f"cont-{contID}") # 修改 srcUrl print(srcUrl) # # 4. 下載視頻 # with open("梨視頻.mp4", mode="wb") as f : # f.write(requests.get(srcUrl).content) ``` ### selenium 反爬 * 有些網站會進行認證,如果 **`window.navigator.webdriver = true`**,表示目前是使用自動化程式進行操作的,如果不處理掉,那我們不管做其他什麼操作都無法通過。 * 那要更改成 **`window.navigator.webdriver = false`** 方式也很簡單,只需要加上下面的程式碼。 ![](https://i.imgur.com/YoHdhoT.png) ![](https://i.imgur.com/6l9vLHj.png) ```python= from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options opt = Options() opt.add_argument('--disable-blink-features=AutomationControlled') web = Chrome(options=opt) web.get("https://google.com") ``` ## 代理 1. 如果需要短時間、大量的爬取數據,不到萬不得已**不推薦使用**。 2. 原理 : 透過第三方的機器發送請求。 > * 一般是 `a` 直接向 `b` 發送請求,但是如果短時間大量的搜索的話,可能會被 `b` 伺服器封鎖 ip。 > * 代理 : `a` 先向 **第三方 `d`** 發請求,接著才向 `b` 發請求,這時對於 `b` 來說,就是很多個人前往搜索,很難會發現到背後其實是 `a` 在控制。 >![](https://i.imgur.com/FpPkkF7.png) ### 程式碼 :::warning 代理 IP 怎麼找? ---> 上網搜尋 **免費代理IP**,嘗試找到能用的 IP。 ::: ```python= import requests ip = "IP地址:端口" proxies = { # 根據網址最前面的判斷 ( http、https 、... ) "https": f"https://{ip}" } resp = requests.get("https://www.baidu.com/", proxies=proxies) resp.encoding = "utf-8" print(resp.text) ``` ## [實作 : 網易雲音樂評論](https://hackmd.io/7VECXtzmSpyLYLaZjVQ0pA) ## 多線程 & 多進程 1. 定義 : >* 線程 ( Thread ) : 執行單位 >* 進程 ( Process ) : 資源單位,每個進程都有自己的記憶體空間。 2. 特色 : >* 一個程式至少有一個進程,一個進程至少有一個線程。 ### 程式碼 ( MultiThreading ) 1. **`function`**。 > * 創建 `thread`。 >> 1. 執行任務 **`target=func`**。 >> 2. 如果想要取名給參數,必須傳遞 **`args=元組(tuple)`**。 >> :::warning >> **`元組(tuple)`** 如果只有一個元素,最後必須加 **`,`**,這是規定。 >> ---> **`args=("一號",)`** >> ::: >```python=11 >thread.append(Thread(target=func, args=(i,))) >``` > * 給予狀態 **`thread.start()`**。 >> 1. 表示該線程狀態為可以工作,但具體執行時間由CPU決定。 > * **`thread.join()`** 表示主程式等到子程序結束後,才會接著執行。 ```python= from threading import Thread def func(name): for i in range(300): print(name, i) if __name__ == '__main__': thread = [] for i in range (5): thread.append(Thread(target=func, args=(i,))) # args必須是元組(tuple) thread[i].start() for i in range(300): print("main", i) # 其他程序... for i in range(5): # 等到子程序結束後,才會接著執行 thread[i].join() print("Done!!") ``` 2. 使用 **`class`**。 > * 執行任務寫在 **`def run():`** 裡面,這是**固定的**。 > * 如果要取名則寫在 **`def __init__(self, name)`** 裡面進行初始化。 >```python= >def __init__(self, name): > Thread.__init__(self) > self.name = name >``` >:::warning >:warning: 同樣必須使用 **`thread.start()`** 賦予狀態,如果寫成 **`thread.run()`** 的話,就會變成單線程了。 >::: ```python= from threading import Thread class MyThread(Thread): def __init__(self, name): Thread.__init__(self) self.name = name def run(self): ## 執行函數固定寫run() for i in range(500): print(self.name, i) if __name__ == '__main__': thread = [] for i in range(5): thread.append(MyThread(i)) #thread[i].run(); # 不能寫.run(),不然就變成"單線程"了 thread[i].start() for i in range(500): print("main", i) # 其他程序... for i in range(5): # 等到子程序結束後,才會接著執行 thread[i].join() print("Done!!") ``` ### 程式碼 ( MultuProcessing ) * 寫法與 **`Thread`** 相同,但較少用。 :::danger 在 spyder 中執行可能會出現無法印出 Prcessing 內容的情況。 ::: ```python= from multiprocessing import Process def func(name): for i in range(50000) : print(name, i) if __name__ == '__main__': process = Process(target=func, args=("child",)) process.start() for i in range(5) : print("main", i) ``` ```python= from multiprocessing import Process class MyProcess(Process): def __init__(self, name): Process.__init__(self) self.name = name def run(self): ## 執行函數固定寫run() for i in range(50000): print(self.name, i) if __name__ == '__main__': process = MyProcess("child") process.start() for i in range(500) : print("main", i) ``` ### 線程池實作 * **線程池**使用方式 : >```python= >from concurrent.futures import ThreadPoolExecutor # 線程池 ># from concurrent.futures import ProcessPoolExecutor # 進程池 > >with ThreadPoolExecutor(50) as thread : # 建立 50 個線程 > for i in range(1, 500): # 任務數量 > # .submit(任務, 參數) -> 給予thread任務 > thread.submit(func, args) >``` ```python= ''' 目標 : 使用線程池爬取小雞上工任務的名稱、薪水, 最後存入.csv檔。 ''' import requests from concurrent.futures import ThreadPoolExecutor from lxml import etree import time import csv f = open("線程池 - 小雞上工.csv", mode = 'w') csvwriter = csv.writer(f) def one_page(url) : resp = requests.get(url) html = etree.HTML(resp.text) lis = html.xpath("/html/body/div[1]/main/section/div[2]/ul[2]/li") for li in lis: job_name = li.xpath("./a/div[1]/h2/text()")[0] salary = li.xpath("./a/div[1]/p[2]/span[1]/text()")[0] job = [job_name.strip(), salary] csvwriter.writerow(job) print("page = ", url.split("=")[1], job_name.strip(), salary) resp.close() time.sleep(1) if __name__ == '__main__': with ThreadPoolExecutor(50) as thread : # 建立 50 個線程 for i in range(1, 500): # 任務數量 url = f"https://www.chickpt.com.tw/cases?page={i}" thread.submit(one_page, url) # 給予任務 f.close() ``` ## 協程 ( Coroutine ) : 多任務異步操作 :::success 與 `線程` 的差別 ? `協程` 是屬於 **`單線程`**, 微觀上,一個任務一個任務的進行切換,切換條件一般是遇到阻塞狀態。 宏觀上,我們能看到的是多個任務一起進行。 也就是 **多任務異步操作**。 ::: 1. 當程序處於阻塞狀態`( IO, sleep, requests.get()...)`,CPU 並不為我們工作。 2. **`協程`** 在遇到阻塞狀態時,可以選擇性的切換到其他任務上 ( 透過程序切換 )。 ### 語法 * **`async`** : 用來宣告 `function` 有異步功能。 * **`await`** : 掛起任務,用來標記 `Coroutine` 切換暫停和繼續的點。 * **`asyncio.sleep()`** : 類似 `time.sleep()`,但是後者會讓協程停止。 * **`asyncio.create_task()`** : 打包任務。 * **`asyncio.wait(tasks)`** : 把多個協程對象包成一個大的對象。 * **`asyncio.run()`** : 用來運行協程函數。 :::warning 被宣告成 `async` 的函數不能直接使用, 必須使用 **`await(func)`** 或是使用特別的方式 call 他。 ```python= asyncio.run(func) ``` ::: ### 程式碼 :::danger 使用 `spyder` 可能遇到的問題。 ```python= "asyncio.run() cannot be called from a running event loop") RuntimeError: asyncio.run() cannot be called from a running event loop ``` ```python=4 # Spyder 其連線著 IPython 核心, # 而 IPython 核心本身在事件迴圈上執行,但 asyncio 不允許巢狀事件​​迴圈 # 解法一 : 不用 spyder # 解法二 : 在開頭引入模組 import nest_asyncio nest_asyncio.apply() ``` ::: :::warning 撰寫時建議寫一個**協程的主函數**,否則 `main` 中函數太攏長。 ```python=26 async def main(): tasks = [ asyncio.create_task(func1()), # py3.8以後必須自己建立 task asyncio.create_task(func2()), asyncio.create_task(func3()) ] await asyncio.wait(tasks) ``` ::: ```python= import time import asyncio # Spyder 其連線著 IPython 核心, # 而 IPython 核心本身在事件迴圈上執行,但 asyncio 不允許巢狀事件​​迴圈 # 解法一 : 不用 spyder # 解法二 : 在開頭引入模組 import nest_asyncio nest_asyncio.apply() async def func1(): print("1 in") await asyncio.sleep(3) # 記得加 await 掛起任務 print("1 out") async def func2(): print("2 in") await asyncio.sleep(2) print("2 out") async def func3(): print("3 in") await asyncio.sleep(5) print("3 out") async def main(): tasks = [ asyncio.create_task(func1()), # py3.8以後必須自己建立 task asyncio.create_task(func2()), asyncio.create_task(func3()) ] await asyncio.wait(tasks) # 記得也要加 await if __name__ == '__main__': t1 = time.time() asyncio.run(main()) t2 = time.time() print(t2 - t1) ``` ``` # output 1 in 2 in 3 in 2 out 1 out 3 out 5.009197473526001 ``` ### 如何應用到爬蟲 1. 平常用的 **`requests.get()`** 屬於同步,所以需要使用另外的模組來實現異步操作。 ```python= import aiohttp ``` 2. 主架構與上方協程差不多,所以只需學會 **`aiohttp`** 的用法。 > > * 對應關係 : `( aiohttp v.s. requests )` >> * 發送請求 : >> 1. **`session = aiohttp.ClientSession()`** <--> **`requests`**。 >> 2. **`session.get()`** <--> **`requests.get()`**。 >> 3. **`session.post()`** <--> **`requests.post()`**。 >> * 讀取內容 : >> 1. **`resp.content.read()`** <--> **`resp.content`**。 >> 2. **`resp.text()`** <--> **`resp.text`**。 >> 3. **`resp.json()`** <--> **`resp.json()`**。 3. 注意事項 : > * **`async with`** 是固定搭配詞 ---> 可以省去 **`.close`** 的步驟。 > * **`f.write(await resp.content.read())`** 這裡記得需要加 **`await`**。 > ---> 因為前面請求是**異步操作**,所以讀取的時候須加上 **`await`**,等到有東西的時候再回來讀取內容。 > * 文件讀寫也屬於 IO 操作,也會浪費時間,如果也想要使用異步的話 > ---> 使用 **`aiofiles`** 模組,用法參考下個小節 **[ 實作 : 爬取一部小說 ]**。 ```python=14 async def aio_download(url): name = url.rsplit("-", 1)[1] async with aiohttp.ClientSession() as session: # = requests async with session.get(url) as resp: # = requests.get() with open(f"./圖片/{name}", mode="wb") as f: # 非異步,若要異步需使用aiofiles模組 f.write(await resp.content.read()) # 讀取內容是異步的,需要await掛起 print(name, "Done!") ``` #### 使用協程 ```python= import asyncio import aiohttp # 異步版的requests import time import nest_asyncio nest_asyncio.apply() urls = [ "https://image.shutterstock.com/image-photo/graceful-white-swan-swimming-lake-600w-1894137055.jpg", "https://image.shutterstock.com/image-vector/grow-grace-wild-flower-tshirt-600w-1886793193.jpg", "https://image.shutterstock.com/image-illustration/watercolor-hand-drawn-illustration-praying-600w-1920875315.jpg" ] async def aio_download(url): name = url.rsplit("-", 1)[1] async with aiohttp.ClientSession() as session: # = requests async with session.get(url) as resp: # = requests.get() with open(f"./圖片/{name}", mode="wb") as f: # 非異步,若要異步需使用aiofiles模組 f.write(await resp.content.read()) # 讀取內容是異步的,需要await掛起 print(name, "Done!") async def main(): tasks = [] for url in urls : tasks.append(asyncio.create_task(aio_download(url))) await asyncio.wait(tasks) if __name__ == '__main__': t1 = time.time() asyncio.run(main()) t2 = time.time() print(t2 - t1) ``` ``` # output 1894137055.jpg Done! 1920875315.jpg Done! 1886793193.jpg Done! 0.07413244247436523 ``` #### 未使用協程 ```python= import requests import time urls = [ r"https://image.shutterstock.com/image-photo/graceful-white-swan-swimming-lake-600w-1894137055.jpg", r"https://image.shutterstock.com/image-vector/grow-grace-wild-flower-tshirt-600w-1886793193.jpg", r"https://image.shutterstock.com/image-illustration/watercolor-hand-drawn-illustration-praying-600w-1920875315.jpg" ] t1 = time.time() for url in urls : resp = requests.get(url) name = url.rsplit("-", 1)[1] with open(f"./圖片/{name}", mode="wb") as f: f.write(resp.content) print(name, "Done!") t2 = time.time() print(t2 - t1) ``` ``` # output 1894137055.jpg Done! 1886793193.jpg Done! 1920875315.jpg Done! 0.21263480186462402 ``` ### 實作 : 爬取一部小說 1. 總共兩步驟。 > * 請求到小說主頁面,拿到所有章節的鏈結和名稱。 > * 從鏈結中下載內容,儲存到檔案。 2. 思考一件事,哪個步驟應該要使用異步,哪個步驟使用同步就可以了。 > * 首先想要拿到所有章節的鏈結和名稱,其實只需要一次請求就能拿到全部的內容了,所以只需要**同步操作**即可。 > * 想要下載所有內容的話,須要請求到每章節各自的鏈結,之後再將內容儲存到檔案,相當耗時,可以考慮使用**異步操作**。 3. 透過比較可以發現當對象多的時候,使用協程的效率是相當的高。 > `協程 1.44秒` v.s. `非協程 39.67秒`。 :::success **`aiofiles`** 用法 : ```python=32 async with aiofiles.open(f"./小說/{title}.txt", mode="w", encoding="utf-8") as f: await f.write(dic['data']['novel']['content']) ``` ::: #### 使用協程 : 1.438493251800537秒 ```python= ''' 目標 : 從百度小說爬取一部小說 ''' import requests import asyncio import aiohttp import aiofiles import json import time import nest_asyncio nest_asyncio.apply() async def aio_download(book_ID, title, cid) : # https://dushu.baidu.com/api/pc/getChapterContent?data={%22book_id%22:%224315647161%22,%22cid%22:%224315647161|10221395%22,%22need_bookinfo%22:1} # 因為url後段data部分不好處理,所以拉出來給值後,再用json轉回string data = { "book_id": book_ID, "cid": f"4315647161|{cid}", "need_bookinfo":1 } data = json.dumps(data) url = f'https://dushu.baidu.com/api/pc/getChapterContent?data={data}' async with aiohttp.ClientSession() as seesion : async with seesion.get(url) as resp : dic = await resp.json() async with aiofiles.open(f"./小說/{title}.txt", mode="w", encoding="utf-8") as f: await f.write(dic['data']['novel']['content']) async def get_main_page(url, book_ID) : resp = requests.get(url) dic = resp.json() tasks = [] for book_info in dic['data']['novel']['items'][:100] : title = book_info['title'] cid = book_info['cid'] tasks.append(asyncio.create_task(aio_download(book_ID, title, cid))) # 將所有任務包裝完後一起送出 await asyncio.wait(tasks) if __name__ == '__main__' : book_ID = "4315647161" url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":' + book_ID + '}' t1 = time.time() asyncio.run(get_main_page(url, book_ID)) t2 = time.time() print(t2 - t1) ``` ``` # output 1.438493251800537 ``` #### 未使用協程 : 39.67288637161255秒 ```python= ''' 目標 : 從百度小說爬取一部小說 ''' import requests import json import time def download(book_ID, title, cid) : data = { "book_id": book_ID, "cid": f"4315647161|{cid}", "need_bookinfo":1 } data = json.dumps(data) url = f'https://dushu.baidu.com/api/pc/getChapterContent?data={data}' resp = requests.get(url) dic = resp.json() with open(f"./小說/{title}.txt", mode="w", encoding="utf-8") as f: f.write(dic['data']['novel']['content']) if __name__ == '__main__' : book_ID = "4315647161" url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":' + book_ID + '}' t1 = time.time() resp = requests.get(url) dic = resp.json() for book_info in dic['data']['novel']['items'][:100] : title = book_info['title'] cid = book_info['cid'] download(book_ID, title, cid) t2 = time.time() print(t2 - t1) ``` ``` # output 39.67288637161255 ``` ## 爬取影片 1. 一般視頻網站是如何做的? > * 用戶上傳 -> 轉碼 ( 把視頻進行處理, 2k、1080... ) -> 切片處理。 > * 假設一部影片 60G,所以用戶在拉動進度條的時候,只需載入小段小段的影片,而不是一次載入全部影片,這樣不僅是用戶端會非常卡,網站也會非常消耗資源,因為拉動一次就必須消耗 60G 的流量。 > * 因為切片的影片有很多,所以必須有一個文件紀錄**影片播放順序**以及**影片存放路徑** ---> **`M3U文件`** 經過 `utf-8` 編碼後變成 **`M3U8文件`**。 > ![](https://i.imgur.com/9QhSglo.png) ### 簡單爬取影片 ![](https://i.imgur.com/rg1ddyn.png) ```python= # 流程 : # 1. 拿到頁面源代碼 # 2. 從頁面源代碼提取m3u8的url # 3. 下載m3u8 # 4. 讀取m3u8,下載影片 import requests import re # 1. 拿到頁面源代碼 url = "https://www.91kanju.com/vod-play/54812-1-1.html" resp = requests.get(url) # 2. 從頁面源代碼提取m3u8的url obj = re.compile("url: '(?P<m3u8>.*?)',", re.S) m3u8 = obj.search(resp.text).group("m3u8") resp.close() # 3. 下載m3u8 resp2 = requests.get(m3u8) with open("哲仁王后.m3u8", mode="wb") as f : f.write(resp2.content) resp2.close() # 4. 讀取m3u8,下載影片 part = 1 with open("哲仁王后.m3u8", mode="r") as f : for line in f : line = line.strip() # 先去掉空格、空白、換行 if(line[0] == '#') : # 不需要開頭是#的資料,我們只想影片鏈結 continue resp3 = requests.get(line) with open(f"./video/{part}.mp4", mode="wb") as file : file.write(resp3.content) print("part", part, "done") part += 1 resp3.close() ``` ## [實作 : 爬取雲播影片](https://hackmd.io/Es6Ryl6ZRyS1rRRtDccURw?both) ## selenium ### 環境設置 1. 下載 `selenium` ``` pip install selenium ``` 2. 下載瀏覽器驅動器 : [ChromeDriver - WebDriver for Chrome](https://chromedriver.chromium.org/) * 下載與瀏覽器相應的版本。 * 解壓縮後將 `exe檔` 放入自己 `python` 安裝的位置。 ![](https://i.imgur.com/O9qN7rp.png) ### 優劣勢 * 優勢 : > 1. 有些網站使用 `requests` 處理會非常困難,因為網站數據是經過加密的,想要處理的話需要像[網易雲音樂評論](https://hackmd.io/7VECXtzmSpyLYLaZjVQ0pA)一樣,太過麻煩。 > 如果可以把我們的程序連到瀏覽器,讓瀏覽器處理各種複雜的操作,而我們只接受最後的結果,就很完美了。 > :star: 這時就需要用到 `selenium`。 > 2. 原本是個**自動化測試工具** ---> 打開瀏覽器,像人一樣去操作瀏覽器。 > 3. 程式碼邏輯與實際操作瀏覽器非常像。 > 4. 可以從 `selenium` 直接提取網頁上的各種訊息,因為那些訊息對於 `selenium` 來說都是透明的。 * 劣勢 : > 1. 速度慢,需要等當前網站載入完全後,才會開始執行。 > 2. 有些網站的反爬檢測到是 `selenium` 在進行操作時,就不動了。 ### 語法 :::success :arrow_down: 可能用到的模組 :arrow_down: ```python= from selenium.webdriver import Chrome # 基本Chrome操作 from selenium.webdriver.common.action_chains import ActionChains # 滑鼠操作 from selenium.webdriver.common.keys import Keys # 鍵盤操作 from selenium.webdriver.support.select import Select # 處理select標籤 from selenium.webdriver.chrome.options import Options # 配置瀏覽器參數 ``` ::: #### EXAMPLE 1 : 基礎操作 + 讀取內容 + 切換視窗 :::warning 在某些操作之後會需要 `time.sleep()`,因為如果網頁還沒載入完全就想爬取內容,就算程式碼正確,也可能會造成程式無法正確執行。 ::: * **`web = Chrome()`** : 建立一個瀏覽器 * **`web.get()`** : 請求網站。 * **`web.find_element_by_xpath()`** : 尋找一個元素。 **`web.find_elements_by_xpath()`** : 尋找所有元素。 > 還有其他的搜尋方式 : ![](https://i.imgur.com/DSpdodE.png) * **`.click()`** : 點擊。 > 先使用 `find_element_by_xpath()` 定位到按鈕位置,之後使用 `click()` 點擊。 >```python=10 ># 點擊選擇城市 >web.find_element_by_xpath('//*[@id="changeCityBox"]/p[1]/a').click() >``` * **`.send_key()`** : 輸入。 > 在定位到的地方輸入指定的內容。 > 1. `.send_key("XXX")` : 輸入字串。 > 2. `.send_key(Keys.XXX)` : 輸入鍵盤上的`ENTER, SPACE...`。 >```python=13 ># 找到搜尋欄,輸入 xxxx ---> 按下搜索 or ENTER >search = web.find_element_by_xpath('//*[@id="search_input"]').send_keys("python", Keys.ENTER) >``` > * **`.text`** : 取得內容 ( 不用像 `xpath` 一樣在路徑後加上`'/text'` )。 --- :::warning 當 `selenium` 打開 **`新頁面`** 或是 **`iframe`** 時,它本身的視野並不會跟著切換, 必須自己主動切換 `selenium` 的視野。 ::: * **`web.switch_to`** : 切換視窗。 > 1. **`web.switch_to.window()`** : 切換到其他頁面。 > 2. **`web.switch_to.frame()`** : 切換到 `iframe`。 > **`web.switch_to.default_content()`** : 從 `iframe` 切換到主頁面。 > 3. **`web.window_handles[]`** : 記錄著當前瀏覽器的所有視窗。 >```python=11 ># 此時selenium的視野默認並不會自己切換到新頁面 >web.switch_to.window(web.window_handles[-1]) >``` >![](https://i.imgur.com/r6zZKDn.png) >```python=24 ># 2.2 想處理iframe的話,就必須先拿到iframe,然後切換視角,才能拿到數據。 >iframe = web.find_element_by_xpath('//*[@id="player_iframe"]') >web.switch_to.frame(iframe) >``` >```python=30 >web.switch_to.default_content() >``` * **`web.close()`** : 關閉頁面。 ```python= from selenium.webdriver import Chrome from selenium.webdriver.common.keys import Keys import time web = Chrome() # 建立一個瀏覽器 # 1. 如何爬取內容 web.get("https://www.lagou.com/") # 點擊選擇城市 web.find_element_by_xpath('//*[@id="changeCityBox"]/p[1]/a').click() # 找到搜尋欄,輸入 xxxx ---> 按下搜索 or ENTER search = web.find_element_by_xpath('//*[@id="search_input"]').send_keys("python", Keys.ENTER) time.sleep(3) # 提取當前頁面的工作內容 jobs = web.find_elements_by_xpath('//*[@id="s_position_list"]/ul/li') for job in jobs : job_name = job.find_element_by_xpath('./div[1]/div[1]/div[1]/a/h3').text job_company = job.find_element_by_xpath('./div[1]/div[2]/div[1]/a').text job_salary = job.find_element_by_xpath('./div[1]/div[1]/div[2]/div/span').text print(job_name, job_company, job_salary) # 2. 如何切換視窗 # 2.1 點擊工作,出現新頁面 web.find_element_by_xpath('//*[@id="s_position_list"]/ul/li[2]/div[1]/div[1]/div[1]/a/h3').click() # 此時selenium的視野默認並不會自己切換到新頁面 web.switch_to.window(web.window_handles[-1]) content = web.find_element_by_xpath('//*[@id="job_detail"]/dd[2]').text print(content) # 如果想要回到原頁面,就把當前頁面關掉,再切回去就可以了 web.close() web.switch_to.window(web.window_handles[0]) web.get("https://www.91kanju.com/vod-play/541-2-1.html") # 2.2 想處理iframe的話,就必須先拿到iframe,然後切換視角,才能拿到數據。 iframe = web.find_element_by_xpath('//*[@id="player_iframe"]') web.switch_to.frame(iframe) frame_text = web.find_element_by_xpath('//*[@id="sub-frame-error-details"]').text print("iframe:", frame_text) web.switch_to.default_content() main_text = web.find_element_by_xpath('/html/body/div[1]/div/div[1]/div/div/div/div[2]/div[1]').text print("default page:", main_text) ``` --- #### EXAMPLE 2 : 處理 `select` 標籤 >![](https://i.imgur.com/NdpBQS8.png) > 1. 一開始透過 `xpath` 定位到的只是一個節點 `element`,而這個 `element` 有一個特殊性,它是下拉列表,所以我們還需要將它包裝。 >```python=12 ># 下拉列表 >web.get("https://www.endata.com.cn/BoxOffice/BO/Year/index.html") >select_element = web.find_element_by_xpath('//*[@id="OptionDate"]') ># 對元素進行包裝成下拉列表 >select_list = Select(select_element) >``` > > 2. 那要怎麼調整選項? > > ![](https://i.imgur.com/DNHrRBg.png) >> * 可以發現 `select標籤` 下有很多的 `option標籤`,那我們可以用 `for loop` 來獲得所有 `option` 的索引。 >> ---> 取得`option`的長度 **`len(select_list.options)`**。 >```python=18 ># 拿到所有option的索引位置 >for i in range(len(select_list.options)): >``` >> >> * 進行切換 : 總共有三種方式 `index`、`value`、`visible_text`。 >> >> ![](https://i.imgur.com/f2VxaXT.png) >> >```python=18 ># 拿到所有option的索引位置 >for i in range(len(select_list.options)): > select_list.select_by_index(i) > time.sleep(2) > table = web.find_element_by_xpath('//*[@id="TableList"]/table').text > print(table) >``` ```python= from selenium.webdriver import Chrome from selenium.webdriver.support.select import Select web = Chrome() # 下拉列表 web.get("https://www.endata.com.cn/BoxOffice/BO/Year/index.html") select_element = web.find_element_by_xpath('//*[@id="OptionDate"]') # 對元素進行包裝成下拉列表 select_list = Select(select_element) # 拿到所有option的索引位置 for i in range(len(select_list.options)): select_list.select_by_index(i) time.sleep(2) table = web.find_element_by_xpath('//*[@id="TableList"]/table').text print(table) ``` --- #### EXAMPLE 3 : 滑鼠操作 1. 可用來拖拉驗證操作。 :::warning 先定義事件鍊,最後必須 **`.perform()`** 提交事件執行,否則該行代碼並不會動作。 ```python= ActionChains(web).move_to_element_with_offset(to_element, x, y).click().perform() ``` ::: ```python= from selenium.webdriver.common.action_chains import ActionChains click(on_element=None) —— 單擊滑鼠左鍵 click_and_hold(on_element=None) —— 點選滑鼠左鍵,不鬆開 context_click(on_element=None) —— 點選滑鼠右鍵 double_click(on_element=None) —— 雙擊滑鼠左鍵 drag_and_drop(source, target) —— 拖拽到某個元素然後鬆開 drag_and_drop_by_offset(source, xoffset, yoffset) —— 拖拽到某個座標然後鬆開 key_down(value, element=None) —— 按下某個鍵盤上的鍵 key_up(value, element=None) —— 鬆開某個鍵 move_by_offset(xoffset, yoffset) —— 滑鼠從當前位置移動到某個座標 move_to_element(to_element) —— 滑鼠移動到某個元素 move_to_element_with_offset(to_element, xoffset, yoffset) —— 移動到距某個元素(左上角座標)多少距離的位置 release(on_element=None) —— 在某個元素位置鬆開滑鼠左鍵 send_keys(*keys_to_send) —— 傳送某個鍵到當前焦點的元素 send_keys_to_element(element, *keys_to_send) —— 傳送某個鍵到指定元素 perform() —— 執行鏈中的所有動作 ``` --- #### 其他語法 * **`wab.page_source()`** : 取得頁面代碼。 ( 經過瀏覽器數據加載及 js 執行之後的 html 內容 )。 * **`無頭瀏覽器`** : 就是背景執行,只需加上 `opt` 就可以了。 ```python= from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options # 無頭瀏覽器(背景執行,不會出現瀏覽器) opt = Options() opt.add_argument("--headless") opt.add_argument("--disable-gpu") web = Chrome(options=opt) ``` * **`反爬`** : > * 有些網站會進行認證,如果 **`window.navigator.webdriver = true`**,表示目前是使用自動化程式進行操作的,如果不處理掉,那我們不管做其他什麼操作都無法通過。 > * 那要更改成 **`window.navigator.webdriver = false`** 方式也很簡單,只需要加上下面的程式碼。 > > ![](https://i.imgur.com/YoHdhoT.png) > > ![](https://i.imgur.com/6l9vLHj.png) ```python= from selenium.webdriver import Chrome from selenium.webdriver.chrome.options import Options opt = Options() opt.add_argument('--disable-blink-features=AutomationControlled') web = Chrome(options=opt) web.get("https://google.com") ``` ## 識別驗證碼 > ....