--- tags: tutorials title: Multi Threading description: no desc # type: slide # slideOptions: #theme: night # spotlight: # enabled: false --- {%hackmd BJrTq20hE %} ###### [什麼!你想問爬蟲!?](https://www.learncodewithmike.com/2020/02/python-beautifulsoup-web-scraper.html) # Multi Threading ## 什麼是 Multi Threading(多執行緒)? Multi Threading 的使用主要用在加速程式執行,它可以創建多條執行緒(Thread),電腦再將這些執行緒累積的工作排入CPU 的 Process之中處理,在寫爬蟲程式的時候常常會有傳送要求(requests)給對方主機並等待對方回應(response)的閒置時間,這段時間電腦通常只是等待,並不會做任何工作,這個時候就是使用 Multi Threading 的好時機。 ### Serial Processing vs Multi Threading 一般寫 python 程式時的寫法,是會按照設計的流程做成序列(serial)執行的程式(Fig 1),電腦每個process在一個時間只做一件事,做完才能塞下一個工作,這樣就是只用到一個Thread。 <center> <img src="https://i.imgur.com/9qIO4Ka.png" height="300" > Fig 1.serial program </center> 但是當今天有一長串清單的要求要取得時,每個要求都要等待收到回覆,這樣會花費太多時間在等待上面(這種效能限制叫做 IO-bound),不僅沒效率使用者也會等到不耐煩。使用 Multi Threading 則可以在 process 等待時填入其他工作,如 Fig 2. 所示。 <center> <img src="https://i.imgur.com/ys9hFdU.png" height="400"> Fig 2.Multi Threading </center> ### Multi Threading vs Multi Processing 簡單來說 Multi Threading 就是同時開好幾條序列執行的工作流程來想辦法增加 CPU 使用率,如果今天做的是像計算模擬那樣 CPU 工作全部都被排滿的情形就不適用。Multi Processing 則是做多核心平行運算,可以用來增強像是計算模擬這類型運算的效率。 ## Multi Threading with Python 我以爬取 NIST 網站上的化合物圖片的程式碼來說明如何使用 Multi Threading ### Serial 這組 code 並沒有使用 Multi Threading ,所有程式碼按照一般流程由上至下執行 ``` import time import requests #target chemical's CAS number chem_list = ['74828','74873','75092','67663','56235'] start_time = time.time() # get all chemical's picture in the list for chem in chem_list: re = requests.get('https://webbook.nist.gov/cgi/cbook.cgi?Struct=C'+chem+'&Type=Color') #print if the response is good or bad ,200 means it's good print(re) end_time = time.time() print('total time: %s secs'%(end_time - start_time)) ``` 執行結果: ``` <Response [200]> <Response [200]> <Response [200]> <Response [200]> <Response [200]> total time: 4.3146514892578125 secs ``` ### Multi Threading 接下來讓我們 import threading 這個模組並改寫程式,加入 multi threading(建議有OOP基礎才比較好讀懂code): ``` import threading import time import requests #target chemical's CAS number chem_list = ['74828','74873','75092','67663','56235'] start_time = time.time() # function defined for get chemical's picture def get_chemcial_picture(cas) : re = requests.get('https://webbook.nist.gov/cgi/cbook.cgi?Struct=C' +cas+ '&Type=Color') #print if the response is good or bad ,200 means it's good print(re) return re for chem in chem_list: # construct an instance of Thread class ,assign task to it t = threading.Thread(target=get_chemcial_picture,args=(chem,)) # make this thread start to work t.start() end_time = time.time() print('total time: %s secs'%(end_time - start_time)) ``` 執行結果: ``` total time: 0.0029921531677246094 secs <Response [200]> <Response [200]> <Response [200]> <Response [200]> <Response [200]> ``` 執行結果並不是很理想,因為當你執行完把任務交到各個 Thread 並啟動任務後,就接著跑原本該是在response全部回來後才該執行的計算花費時間指令。 ### join() function in Threading 為了不讓設計好的流程亂掉,我們必須在適當的位置加上 join() 這個函數,使用這個函數將特定的 Thread 加入觀察名單,只有名單上所有的 Thread 內的工作都被做完,join()後面的指令才被執行: ``` import threading import time import requests #target chemical's CAS number chem_list = ['74828','74873','75092','67663','56235'] start_time = time.time() # function defined for get chemical's picture def get_chemcial_picture(cas) : re = requests.get('https://webbook.nist.gov/cgi/cbook.cgi?Struct=C' +cas+ '&Type=Color') #print if the response is good or bad ,200 means it's good print(re) return re thread_list = [] for chem in chem_list: # construct an instance of Thread class ,assign task to it t = threading.Thread(target=get_chemcial_picture,args=(chem,)) thread_list.append(t) # make this thread start to work t.start() # join every thread to wait until every thread's work is done for t in thread_list: t.join() end_time = time.time() print('total time: %s secs'%(end_time - start_time)) ``` 執行結果: ``` <Response [200]> <Response [200]> <Response [200]> <Response [200]> <Response [200]> total time: 0.8747215270996094 secs ``` 可以看到執行結果較 Serial 的時間花費少了好幾倍。 ### 使用佇列(Queue)來防止被 Ban ###### [什麼是佇列](http://alrightchiu.github.io/SecondRound/queue-introjian-jie-bing-yi-linked-listshi-zuo.html) 看起來使用 Multi Threading 來改 Python code 的確節省了很多時間,但是還是有一個隱憂。當今天需求量大增時,如果一次送出上千上萬筆要求到特定網站的話,這種行為叫做阻斷服務攻擊(DOS),原則上會被 Ban 掉。 為了避免這樣的問題,在一些的爬蟲程式設計時有的人會使用 Time 裡面的 sleep() 函數來阻止程式在過短的時間內重複去造訪特定網站: ``` import time #let current process to wait 5 seconds time.sleep(5) ``` 但如果網站可以接受使用者用 Multi Threading 來抓他的資料的話,應該是不太需要擔心會在只傳送幾個要求時就被 Ban ,但就像前面說的,直接發送大量要求還是有風險的,這個時候可以使用 Python 裡面已經內建好的 Queue 來管理工作 Python 裡面的 Queue 已經有針對 Multi Threading 做了些特化,可以很好的處理 Multi Threading 管理上的需求。如以下程式所示,我們轉而使用 Queue 來存取資料,並運用 Queue 的特性來管理每個 Thread 的工作(Thread 手上沒工作時才去找 Queue 索取新工作,Queue內部存的資料一被取出就離開 Queue 了,當 Queue 內部資料數為零時則全部工作完成),並且直接使用 queue.join() 就可以達到讓 Queue 所有資料都處理完才執行下一步程式的功能,非常的方便 ``` import threading import time import requests import queue # function defined for get chemical's picture def get_chemcial_picture(cas) : re = requests.get('https://webbook.nist.gov/cgi/cbook.cgi?Struct=C' +cas+ '&Type=Color') #print if the response is good or bad ,200 means it's good print(re) return re thread_list = [] # define the number of threads, it can be different from number of tasks MultiThreadNumber = 3 # define a class to inherit Thread class class Worker(threading.Thread): def __init__(self, queue,lock): threading.Thread.__init__(self) self.queue = queue self.lock=lock def run(self): while self.queue.qsize() > 0: data = self.queue.get() get_chemcial_picture(data) #self.lock.acquire() #self.lock.release() self.queue.task_done() #target chemical's CAS number chem_list = ['74828','74873','75092','67663','56235'] start_time = time.time() data_queue = queue.Queue() diction_lock = threading.Lock() for chem in chem_list: data_queue.put(chem) for i in range(MultiThreadNumber): t = Worker(queue=data_queue,lock=diction_lock) thread_list.append(t) # make this thread start to work t.start() # lock process unitl all element in the queue were processed data_queue.join() # join every thread to wait until every thread's work is done for t in thread_list: t.join() end_time = time.time() print('total time: %s secs'%(end_time - start_time)) ``` 執行結果: ``` <Response [200]> <Response [200]> <Response [200]> <Response [200]> <Response [200]> total time: 1.9111742973327637 secs ``` ### 使用 threading.Lock() 來確保資料存取的穩定性 當使用 Multi Threading 時 所有的 Thread 都可以存取當前程式執行區段的區域變數,所以如果要存取資料需要特別注意。 在上面程式碼中的 Worker 裡的 run() 函數中有使用 self.lock.acquire() 及 self.lock.release() ,這兩個函數可以確保當某個 Thread 進入 self.lock.acquire()下面的程式碼區段時,其他擁有相同工作的 Thread 會被擋住暫緩執行,直到這個 Thread 的 self.lock.release() 被執行後其他 Thread 才能輪流執行這一段程式,這些功能通常是用在一些需要進行資料存取時確保不會出問題的方法。