Try  HackMD Logo HackMD

多執行緒爬蟲

# 載入標準函式庫 import threading, queue, time, csv, datetime, os, random, re, calendar from collections.abc import Iterable # 載入第三方函式庫 import pandas as pd import requests from lxml import html # `Crawler`類別 class Crawler(threading.Thread): re_obj = re.compile(r"\n") def __init__(self, cid, str_curr_date, urls, p_df, data, plock, llock, dlock): """ 初始化一支爬蟲執行緒,取得共享資源的參照 """ threading.Thread.__init__(self) self.cid = cid # 執行緒id self.str_curr_date = str_curr_date # 紀錄目前迭代到哪一天 self.urls = urls # 儲存當天所有新聞連結 self.p_df = p_df # 暫存在記憶體的`proxies.csv` self.data = data # 暫存在記憶體的`2019XXXX_output.csv` self.plock = plock # `p_df`的鎖 (記憶體) self.llock = llock # `2019XXXX_logs.txt`的鎖 (磁碟) self.dlock = dlock # `data`的鎖 (磁碟) def get_proxies(self): """ 回傳`requests.get()`要用的`proxies`引數(`dict`)以及索引(`int`) """ # 依照權重選取個proxy proxy = self.p_df.sample(weights='weight') idx = proxy.index.values[0] ip = proxy['ip'].to_string(index=False).strip() port = proxy['port'].to_string(index=False).strip() # 定義requests.get()要用的proxies引數 proxies = { 'https': f"https://{ip}:{port}" } return (idx, proxies) def write_log(self, url, proxies): """ 若發送請求發生錯誤,則寫出失敗紀錄到`2019XXXX_logs.txt` """ dt_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") l = (dt_str, self.cid, url, proxies['https']) # 判斷檔案存在與否,決定是否寫出標題 if os.path.isfile(f"./logs/{self.str_curr_date}_logs.txt"): # 存在,不寫標題 with open(f"./logs/{self.str_curr_date}_logs.txt", "a", newline='', encoding='utf8') as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(l) else: # 不存在,寫標題 with open(f"./logs/{self.str_curr_date}_logs.txt", "a", newline='', encoding='utf8') as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(("datetime", "cid", "url", "proxy")) csvwriter.writerow(l) def get_article(self, h, url): """ 解析每篇新聞內容,爬取出新聞內容、標籤 """ splited_url = url.split("/") # cate_id cate_id = splited_url[-2] # id idx = splited_url[-1] # tags tags = h.xpath("//div[@id='story_tags']/a/text()") tags = " ".join(tags) if len(tags) else "" # contents contents = "" article_body = h.xpath("//div[@id='article_body']") if len(article_body): article_body = article_body[0] # 刪除<p>底下不必要的元素,包括其底下的任何元素及文字 for elem in article_body.xpath("//p[text()]//*[name()='figure' or name()='figcaption' or name()='img']"): elem.drop_tree() # 將每個<p>的純文字爬取出來並以`content`儲存 for p in article_body.xpath("//p[text()]"): contents += p.text_content() contents = Crawler.re_obj.sub("", contents) return (cate_id, idx, tags, contents) def run(self): success = 0 # 用來紀錄是否成功發送請求 # 迭代當天所有新聞連結 while not self.urls.empty(): # 取得新聞連結 url = self.urls.get() # 發送請求直到成功 while True: try: # 請求失敗或是成功10次,就重新依據權重選取個proxy if not success or success==10: with self.plock: idx, proxies = self.get_proxies() success = 0 # 發送請求 r = requests.get( url=url, proxies=proxies, timeout=random.randrange(3,6) ) # 若請求成功則更新proxy權重 (記憶體) with self.plock: self.p_df.loc[idx, 'weight'] += 1 success += 1 except: # 若請求失敗則寫出失敗紀錄 (磁碟) with self.llock: self.write_log(url, proxies) success = 0 continue # 請求成功後,解析新聞內容 h = html.fromstring(r.text) d = self.get_article(h, url) # 將新聞內容更新至`data` (記憶體) with self.dlock: self.data.append(d) # 跳出無窮迴圈,取得下篇新聞連結 break def get_curr_date(): """ 回傳個`datetime.Date`物件,表示目前爬蟲程式已經完成到哪一天 """ if os.path.isfile("record.txt"): with open("record.txt", 'r') as f: d = f.readline() curr_date = datetime.datetime.strptime(d, "%Y-%m-%d").date() else: with open("record.txt", "w", newline='', encoding='utf8') as f: curr_date = datetime.date(2018,12,31) d = curr_date.strftime("%Y-%m-%d") f.write(d) return curr_date def flatten(nested_lst): """ 輸入個巢狀串列(nested list),回傳平坦化(flatten)過後的元素產生器(generator) """ for elem in nested_lst: if isinstance(elem, Iterable): yield from flatten(elem) else: yield elem def main(): num_crwlrs = 100 # 爬蟲執行緒總數 urls = queue.Queue() # 儲存所有新聞連結字串的`Queue` data = [] # 暫時儲存所有解析完畢的資料的`list` p_df = pd.read_csv("proxies.csv") # 載入`proxies.csv` plock = threading.Lock() # `p_df`的鎖 (記憶體) llock = threading.Lock() # `logs.txt`的鎖 (磁碟) dlock = threading.Lock() # `data`的鎖 (記憶體) curr_date = get_curr_date() # `curr_date`記錄目前迭代到哪一天 cal = calendar.Calendar().yeardatescalendar(2019) # 產生個巢狀串列`c`,裡頭的元素是`datetime.Date`物件,從2019-01-01至2019-12-31,有重複日期 # 迭代整個2019年的每一天 for date in flatten(cal): # 處理重複日期以及目前迭代到哪一天 if date <= curr_date: continue # 新聞最多只爬到2019-12-09 if date == datetime.date(2019, 12, 10): break # 更新目前迭代到哪一天 curr_date = date str_curr_date = curr_date.strftime("%Y%m%d") # 起始時間記錄點 t1 = time.time() # 載入當天所有新聞連結並以佇列物件儲存(Queue) filename = f"./data/{str_curr_date}.csv" for url in pd.read_csv(filename)['url'].apply(lambda s: "https://xxx.com" + s): urls.put(url) # 創建並啟動`num_crwlrs`支爬蟲執行緒,以`crawlers`儲存所有爬蟲執行緒物件 crawlers = [] for cid in range(1, num_crwlrs+1): c = Crawler(cid, str_curr_date, urls, p_df, data, plock, llock, dlock) crawlers.append(c) c.start() # 等待所有爬蟲執行緒完成任務 for _ in range(len(crawlers)): c = crawlers.pop() c.join() # 終止時間記錄點 t2 = time.time() proc_time = t2 - t1 # 每完成一日新聞爬取,就做以下三件事: # 1. 將`data`寫出並重新指派為空串列 with open(f"./outputs/{str_curr_date}_{proc_time:.0f}s.csv", "a", newline='', encoding='utf8') as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(("cate_id", "id", "tags", "contents")) csvwriter.writerows(data) data = [] # 2. 將目前進度到哪一天寫出 with open("record.txt", "w", newline='', encoding='utf8') as f: d = curr_date.strftime("%Y-%m-%d") f.write(d) # 3. 將proxy更新過後的權重寫出 p_df.to_csv("proxies.csv", header=True, index=False, encoding='utf8') print(f"{str_curr_date} DONE! {proc_time:.0f}s") if __name__=="__main__": main()