# 載入標準函式庫
import threading, queue, time, csv, datetime, os, random, re, calendar
from collections.abc import Iterable
# 載入第三方函式庫
import pandas as pd
import requests
from lxml import html
# `Crawler`類別
class Crawler(threading.Thread):
re_obj = re.compile(r"\n")
def __init__(self, cid, str_curr_date, urls, p_df, data, plock, llock, dlock):
"""
初始化一支爬蟲執行緒,取得共享資源的參照
"""
threading.Thread.__init__(self)
self.cid = cid # 執行緒id
self.str_curr_date = str_curr_date # 紀錄目前迭代到哪一天
self.urls = urls # 儲存當天所有新聞連結
self.p_df = p_df # 暫存在記憶體的`proxies.csv`
self.data = data # 暫存在記憶體的`2019XXXX_output.csv`
self.plock = plock # `p_df`的鎖 (記憶體)
self.llock = llock # `2019XXXX_logs.txt`的鎖 (磁碟)
self.dlock = dlock # `data`的鎖 (磁碟)
def get_proxies(self):
"""
回傳`requests.get()`要用的`proxies`引數(`dict`)以及索引(`int`)
"""
# 依照權重選取個proxy
proxy = self.p_df.sample(weights='weight')
idx = proxy.index.values[0]
ip = proxy['ip'].to_string(index=False).strip()
port = proxy['port'].to_string(index=False).strip()
# 定義requests.get()要用的proxies引數
proxies = {
'https': f"https://{ip}:{port}"
}
return (idx, proxies)
def write_log(self, url, proxies):
"""
若發送請求發生錯誤,則寫出失敗紀錄到`2019XXXX_logs.txt`
"""
dt_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
l = (dt_str, self.cid, url, proxies['https'])
# 判斷檔案存在與否,決定是否寫出標題
if os.path.isfile(f"./logs/{self.str_curr_date}_logs.txt"):
# 存在,不寫標題
with open(f"./logs/{self.str_curr_date}_logs.txt", "a", newline='', encoding='utf8') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(l)
else:
# 不存在,寫標題
with open(f"./logs/{self.str_curr_date}_logs.txt", "a", newline='', encoding='utf8') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(("datetime", "cid", "url", "proxy"))
csvwriter.writerow(l)
def get_article(self, h, url):
"""
解析每篇新聞內容,爬取出新聞內容、標籤
"""
splited_url = url.split("/")
# cate_id
cate_id = splited_url[-2]
# id
idx = splited_url[-1]
# tags
tags = h.xpath("//div[@id='story_tags']/a/text()")
tags = " ".join(tags) if len(tags) else ""
# contents
contents = ""
article_body = h.xpath("//div[@id='article_body']")
if len(article_body):
article_body = article_body[0]
# 刪除<p>底下不必要的元素,包括其底下的任何元素及文字
for elem in article_body.xpath("//p[text()]//*[name()='figure' or name()='figcaption' or name()='img']"):
elem.drop_tree()
# 將每個<p>的純文字爬取出來並以`content`儲存
for p in article_body.xpath("//p[text()]"):
contents += p.text_content()
contents = Crawler.re_obj.sub("", contents)
return (cate_id, idx, tags, contents)
def run(self):
success = 0 # 用來紀錄是否成功發送請求
# 迭代當天所有新聞連結
while not self.urls.empty():
# 取得新聞連結
url = self.urls.get()
# 發送請求直到成功
while True:
try:
# 請求失敗或是成功10次,就重新依據權重選取個proxy
if not success or success==10:
with self.plock: idx, proxies = self.get_proxies()
success = 0
# 發送請求
r = requests.get(
url=url,
proxies=proxies,
timeout=random.randrange(3,6)
)
# 若請求成功則更新proxy權重 (記憶體)
with self.plock: self.p_df.loc[idx, 'weight'] += 1
success += 1
except:
# 若請求失敗則寫出失敗紀錄 (磁碟)
with self.llock: self.write_log(url, proxies)
success = 0
continue
# 請求成功後,解析新聞內容
h = html.fromstring(r.text)
d = self.get_article(h, url)
# 將新聞內容更新至`data` (記憶體)
with self.dlock:
self.data.append(d)
# 跳出無窮迴圈,取得下篇新聞連結
break
def get_curr_date():
"""
回傳個`datetime.Date`物件,表示目前爬蟲程式已經完成到哪一天
"""
if os.path.isfile("record.txt"):
with open("record.txt", 'r') as f:
d = f.readline()
curr_date = datetime.datetime.strptime(d, "%Y-%m-%d").date()
else:
with open("record.txt", "w", newline='', encoding='utf8') as f:
curr_date = datetime.date(2018,12,31)
d = curr_date.strftime("%Y-%m-%d")
f.write(d)
return curr_date
def flatten(nested_lst):
"""
輸入個巢狀串列(nested list),回傳平坦化(flatten)過後的元素產生器(generator)
"""
for elem in nested_lst:
if isinstance(elem, Iterable):
yield from flatten(elem)
else:
yield elem
def main():
num_crwlrs = 100 # 爬蟲執行緒總數
urls = queue.Queue() # 儲存所有新聞連結字串的`Queue`
data = [] # 暫時儲存所有解析完畢的資料的`list`
p_df = pd.read_csv("proxies.csv") # 載入`proxies.csv`
plock = threading.Lock() # `p_df`的鎖 (記憶體)
llock = threading.Lock() # `logs.txt`的鎖 (磁碟)
dlock = threading.Lock() # `data`的鎖 (記憶體)
curr_date = get_curr_date() # `curr_date`記錄目前迭代到哪一天
cal = calendar.Calendar().yeardatescalendar(2019) # 產生個巢狀串列`c`,裡頭的元素是`datetime.Date`物件,從2019-01-01至2019-12-31,有重複日期
# 迭代整個2019年的每一天
for date in flatten(cal):
# 處理重複日期以及目前迭代到哪一天
if date <= curr_date: continue
# 新聞最多只爬到2019-12-09
if date == datetime.date(2019, 12, 10): break
# 更新目前迭代到哪一天
curr_date = date
str_curr_date = curr_date.strftime("%Y%m%d")
# 起始時間記錄點
t1 = time.time()
# 載入當天所有新聞連結並以佇列物件儲存(Queue)
filename = f"./data/{str_curr_date}.csv"
for url in pd.read_csv(filename)['url'].apply(lambda s: "https://xxx.com" + s):
urls.put(url)
# 創建並啟動`num_crwlrs`支爬蟲執行緒,以`crawlers`儲存所有爬蟲執行緒物件
crawlers = []
for cid in range(1, num_crwlrs+1):
c = Crawler(cid, str_curr_date, urls, p_df, data, plock, llock, dlock)
crawlers.append(c)
c.start()
# 等待所有爬蟲執行緒完成任務
for _ in range(len(crawlers)):
c = crawlers.pop()
c.join()
# 終止時間記錄點
t2 = time.time()
proc_time = t2 - t1
# 每完成一日新聞爬取,就做以下三件事:
# 1. 將`data`寫出並重新指派為空串列
with open(f"./outputs/{str_curr_date}_{proc_time:.0f}s.csv", "a", newline='', encoding='utf8') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(("cate_id", "id", "tags", "contents"))
csvwriter.writerows(data)
data = []
# 2. 將目前進度到哪一天寫出
with open("record.txt", "w", newline='', encoding='utf8') as f:
d = curr_date.strftime("%Y-%m-%d")
f.write(d)
# 3. 將proxy更新過後的權重寫出
p_df.to_csv("proxies.csv", header=True, index=False, encoding='utf8')
print(f"{str_curr_date} DONE! {proc_time:.0f}s")
if __name__=="__main__":
main()
$$
Apr 14, 2024書籍摘要 《產品專案管理全書》 《成長駭客攻略》 《絕對續訂》 《平台策略》 《大會計師教你從財報數字看懂經營本質》 《大會計師教你從財報數字看懂產業本質》 AWS
Sep 30, 2022淺談主題模型 A Brief Introduction to Topic Model BDSE1211 賴冠州(Ed Lai) Contents Introduction Latent Dirichlet Allocation (LDA) Variational Algorithms <!-- Sampling-based Algorithms -->
Oct 14, 2021自行統整 AdaBoost Gradient Boosting Andrew Ng, Coursera Week 1 Week 2 Week 3
May 8, 2020or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up