Python多任務 === KeyWord - 時間片輪轉 - 優先級調度 - 調度算法 - 併行v.s.併發 進程 --- - 進程 vs 程序 - 運行代碼前-> 程序 - 運行代碼時-> 進程 - fork() - Win 無法使用 ```python import os ret = os.fork() print("haha") ``` ``` haha haha # 兩個? ``` ```python import os import time ret = os.fork() if ret == 0: while True: print("--1--") time.sleep(1) else: while True: print("--2--") time.sleep(1) # 同時出現--1-- & --2-- # 說明: # 執行程序時會有個主進程開始執行程序 # 執行至os.fork()時, os會創建另一個進程(子進程), 並複製主(父)進程至子中 # 接著父子進程皆會得到返回值, 子進程必為0, 父進程是子進程的pid # 接著同時往下判斷, 子進程進入--1--迴圈, 父進程進入--2--迴圈 ``` ```$ --2-- --1-- --1-- --2-- --1-- --2-- --2-- --1-- # 問題. 父進程先還是子進程先? # 不一定, 視os算法而定 ``` - getpid()、getppid() ```python import os ret = os.fork() print(ret) if ret>0: print("父進程:%d"%os.getpid()) else: print("子進程:%d,%d"%(os.getpid(), os.getppid())) # pid 查看 # $ ps aux # ps: Process Status # a: all # u: user # x: processes w/o controlling ttys # getppid: parent process ID # pid 知識: # 最大值65535(2^16-1)-> 最大運行66535個進程 # 進程號不允許相同 ``` ```$ 7841 父進程:7840 0 子進程:7841,7840 # 此結果說明主進程創建子進程時, 便會得到子進程的pid # 而7840創建了7841 ``` - 進程先後順序 ```python import os import time ret = os.fork() if ret ==0: print("子進程") #time.sleep(1) time.sleep(5) print("子進程over") else: print("父進程") time.sleep(3) print("over") # $ 以主進程為依據, 主進程結束即跳出, 不會待子進程結束才結束 # 主進程與子進程各自會執行向下的所有代碼, 故over出現了兩次 ``` ```$ 父進程 子進程 over # 3sec後出現 $ 子進程over # $(3sec後出現); 子進程over(5sec後出現) over # 5sec後出現 # print預設下一行, 如不要下一行:print("",end="") ``` - 進程間數據不共享 ```python import os import time num = 100 ret = os.fork() if ret ==0: print("1") num +=1 print("1, num=%d"%num) else: time.sleep(3) print("2") print("2, num=%d"%num) ``` ```$ 1 1, num=101 2 2, num=100 ``` - 進程間的通信 - 同一台電腦 - PIPE & FIFO - signal - 消息對列 - 共享內存 - 信號量 - socket - ... - 不同電腦: 網路 -多個fork ```python import os # 父進程 ret = os.fork() if ret ==0: # 子進程 print("1") else: # 父進程 print("2") # 父子進程 ret = os.fork() if ret ==0: # 孫進程 # 2兒子進程 print("11") else: # 父進程 # 子進程 print("22") ``` ```$ 2 22 1 11 22 11 ``` ```python import os # 父進程 ret = os.fork() if ret ==0: # 子進程 print("1") else: # 父進程 print("2") # 父進程 ret = os.fork() if ret ==0: # 二兒子進程 print("11") else: # 父進程 print("22") ``` ```$ 2 22 1 11 ``` - 三個fork ```python import os os.fork() os.fork() os.fork() print("1") ``` ```$ 1 1 1 1 1 1 1 1 ``` - fork炸彈 ```python import os while Ture: os.fork()wwww ``` - multiprocessing - 由於fork在windows無法調用 - multiprocessing就是跨平台module ```python from multiprocessing import Process import time def test(): while True: print("test") time.sleep(1) p = Process(target=test) # 指定進程執行test p.start # 讓該進程開始執行 while True: print("main") time.sleep(1) ``` ```$ main test main test test main test main ... ``` ```python # 主進程先結束 from multiprocessing import Process import time def test(): for i in range(5): print("test") time.sleep(1) p = Process(target=test) p.start() # 讓這個進程開始執行test函數的code # 在fork中, 不論childProcess有無執行結束, mainProcess結束即跳出$ # 在Process中, mainProcess結束後, 會待childProcess結束才結束 ``` - 補充: - [殭屍進程](https://zh.wikipedia.org/wiki/%E5%83%B5%E5%B0%B8%E8%BF%9B%E7%A8%8B): - 如子進程結束後, 父進程沒將子進程佔用的內存收回, - 在內存尚未被處理期間, 子進程稱為殭屍進程 - [孤兒進程](https://zh.wikipedia.org/wiki/%E5%AD%A4%E5%84%BF%E8%BF%9B%E7%A8%8B): - 如子進程尚未結束而父進程結束了, - 此時子進程稱為孤兒進程 - 誰處理孤兒進程? - PID 0: 切換進程佔用CPU(負責切換任務) - PID 1: 直接/間接創造所有子進程(負責生產小弟) - 孤兒進程即由PID 1 處理 ```$ test test test test test $ ``` ```python # 給target傳參數 # Process([group [, target [, name [, args [, kwargs]]]]]) from multiprocessing import Process import os def test(num): print("child: pid=%d,ppid=%d,num=%d"%(os.getpid(), os.getppid(), num)) p = Process(target=test, args=(100,)) p.start() print("main: pid=%d"%os.getpid()) ``` ```python main: pid=2665 child: pid=2666,ppid=2665,num=100 ``` ```python # join前情提要 # 欲使mainProcess在P執行完後執行 # 惟P執行是隨機秒數 from multiprocessing import Process import time import random def test(): for i in range(random.randint(1,5)): # random.randint 回傳一個隨機整數 print("%d"%i) time.sleep(1) p = Process(target=test) p.start() # time.sleep(2) 永遠無法預測P幾秒後結束 # join: 待進程實例結束後或等待多少秒 # join([timeout]) # 此方法稱為『堵塞』, 亦即待某條件發生才執行 # 該條件發生時稱為『解堵塞』 # timeout為等待的最長時間, 時間到就繼續執行 p.join() print("main") ``` ```$ $ time.sleep(2) 0 1 2 main 3 4 $ time.sleep(2) 0 1 main 2 3 $ p.join() 0 1 2 3 4 main $ p.join() 0 1 main ``` - Process子類 - 有時Process並非只單純執行一個函數 - 而需要很多複雜的內容時 - 此時可以實例化一個類,並繼承Process - 講義補充說明 - time.time: 格林威治時間 - 作用: 判斷運行時間->判斷效率 - 彙編>C>Python>Ruby - time.ctime: 現在時間 ```python from multiprocessing import Process import time class NewProcess(Process): def run(self): while True: print("1") time.sleep(1) p = NewProcess() p.start() while True: print("2") time.sleep(1) # Q. p = Process(target=test), 創建對象時需要指定函數名, # 那class沒有指定函數, 該Process要執行甚麼? # A. start()會調用run(), 故此時重寫run()即可執行欲執行 # # RV. 單例模式、簡單工廠方法 `` ```$ 2 1 1 2 ... ``` - 進程池Pool - 先創建一堆進程等待使用, 類似於Ruby的運作模式 - mainProcess通常用來等待,任務都childPorcess執行 ```python from multiprocessing import Pool import os import time def worker(num): for i in range(3): print("pid:%d, num=%d"%(os.getpid(), num)) time.sleep(1) pool = Pool(3) # 定義進程池, 最大3個進程數 for i in range(5): print("%d"%i) # Pool.apply_async(調用目標,(參數元祖,)) # 向進程池添加任務 pool.apply_async(worker, (i,)) # Q. 只有3個進程數要處理5個任務? # A. 會依序進程處理, 第一批處理三個, 完成後補上下個任務 pool.close() # 關閉進程池, 不再接收新任務 pool.join() # mainProcess不會待childProcess結束才結束, # 如沒設join, 會導致任務池還沒執行即關閉程序 # ps. 不一定Pool設定的最大進程數越大越好, # 須經由壓力測試, 平衡出最適值 ``` ```$ 0 1 2 3 4 pid:14861, num=0 pid:14862, num=1 pid:14863, num=2 pid:14861, num=0 pid:14862, num=1 pid:14863, num=2 pid:14862, num=1 pid:14861, num=0 pid:14863, num=2 pid:14861, num=3 pid:14862, num=4 pid:14861, num=3 pid:14862, num=4 pid:14861, num=3 pid:14862, num=4 ``` - apply 堵塞式 ```python from multiprocessing import Pool import os import time def worker(num): for i in range(3): print("pid:%d, num=%d"%(os.getpid(), num)) time.sleep(1) pool = Pool(3) for i in range(5): print("%d"%i) pool.apply(worker, (i,)) # 堵塞 # 完成此任務才繼續下個任務 pool.close() pool.join() ``` ```$ 0 pid:15837, num=0 pid:15837, num=0 pid:15837, num=0 1 pid:15838, num=1 pid:15838, num=1 pid:15838, num=1 2 pid:15839, num=2 pid:15839, num=2 pid:15839, num=2 3 pid:15837, num=3 pid:15837, num=3 pid:15837, num=3 4 pid:15838, num=4 pid:15838, num=4 pid:15838, num=4 ``` 進程間通信 --- - Queue ```python In [1]: from multiprocessing import Queue In [2]: q = Queue(3) In [3]: q.qsize() NotImplementedError: # Queue.put(obj, block=True, timeout=None) # 寫入隊列,類型不拘 In [4]: q.put("GodJJ") In [6]: q.put([]) In [7]: q.put({}) In [8]: q.put(100) ^C # 空間不夠時,就會堵塞 # nowait 不等待, 直接異常, 通常放在try裡 In [22]: q.put_nowait("wetDream") Full: In [9]: q.get() Out[9]: 'GodJJ' In [10]: q.get() Out[10]: 'DaGG' In [11]: q.get() Out[11]: 'Toyz' In [12]: q.get() ^C # 沒東西取了, 亦發生堵塞 In [18]: q.get_nowait() Empty: # empty: 看隊列是否為空 In [13]: q.empty() Out[13]: True # full: 看隊列是否為滿 In [15]: q.full() Out[15]: False ``` - ProcesssPool間的Queue - multiprocessing.Manager().Queue() - 多進程copyFile ```python from multiprocessing import Pool, Manager import os def copyFile(name, oldFloderName, newFloderName, queue): #fr = open(name) #fw = open(name, w) # 此時打開的是文件夾裡面的檔案名, # 惟當前路徑下只有文件夾的名字, 故會打不開 # 此時應該是open("./test/"+name), # 但不應該將路徑寫死, 故把路徑傳進來 fr = open(oldFloderName+"/"+name) fw = open(newFloderName+"/"+name, "w") content = fr.read() fw.write(content) fr.close() fw.close() queue.put(name) def main(): # 0.獲取要copy的文件夾名 oldFloderName = input("請輸入文件名: ") # 1.創建文件夾 newFloderName = oldFloderName + "-複製" #print(newFloderName) os.mkdir(newFloderName) # 2.獲取文件夾中所有文件名 fileNames = os.listdir(oldFloderName) #print(fileNames) # 3.多進程copy po = Pool(3) queue = Manager().Queue() # apply_async(func, args=(), kwds={}, callback=None, error_callback=None) # po.apply.async(copyFile, args=(fileName)) # 將整個列表傳進去複製, 此時僅吩咐一個任務進去執行,而非多任務 for name in fileNames: po.apply_async(copyFile, args=(name, oldFloderName, newFloderName, queue)) # 查看進度 num = 0 allNum = len(fileNames) while True: queue.get() num+=1 copyRate = num/allNum print("\rcopy的進度: %.2f%%"%(copyRate*100), end="") # \r:歸位; .2f:浮點數兩位; %%:% if num==allNum: break print("\ncomplete") # \n: 換行 if __name__ == "__main__": main() ```$ $ python3 xxx.py 請輸入文件名: test copy的進度: 100.00% complete $ ls xxx.py test-複製 test $ cat test/1.py a=100 $ cat test-複製/1.py a=100 ``` - 進程理論上是直接copy一份到子進程, 惟如此會浪費空間, 故os會採用[cow](https://zh.wikipedia.org/wiki/%E5%AF%AB%E5%85%A5%E6%99%82%E8%A4%87%E8%A3%BD), 必要時才拷貝 線程 --- - 每一進程有一個箭頭, 該箭頭就稱為主線程 - 進程是資源分配的單位, 線程是CPU調度的單位 ```python # 單線程執行 import time def test(): print("1") time.sleep(1) print(time.time()) for i in range(5): test() print(time.time()) ``` ```$ 1559283205.543259 1 1 1 1 1 1559283210.558901 ``` ```python # 多線程執行 from threading import Thread import time def test(): print("1") time.sleep(1) print(time.time()) for i in range(5): t = Thread(target = test) t.start() print(time.time()) ``` ``` 1559283265.34059 1 1 1 1 1 1559283265.3414762 ``` ```python # 另種方式 import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I'm "+self.name+' @ '+str(i) #name属性中保存的是当前线程的名字 #線程號TID, 進程號PID print(msg) if __name__ == '__main__': t = MyThread() t.start() ``` ```$ I'm Thread-1 @ 0 I'm Thread-1 @ 1 I'm Thread-1 @ 2 ``` - 線程執行順序 ```python import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I'm "+self.name+' @ '+str(i) print(msg) def test(): for i in range(5): t = MyThread() t.start() if __name__ == '__main__': test() ``` ```$ I'm Thread-1 @ 0 I'm Thread-2 @ 0 I'm Thread-3 @ 0 I'm Thread-5 @ 0 I'm Thread-4 @ 0 I'm Thread-5 @ 1 I'm Thread-3 @ 1 I'm Thread-1 @ 1 I'm Thread-2 @ 1 I'm Thread-4 @ 1 I'm Thread-5 @ 2 I'm Thread-1 @ 2 I'm Thread-3 @ 2 I'm Thread-4 @ 2 I'm Thread-2 @ 2 # 每次順序都不同, 依據os的調用方法 ``` - 線程共享全局變量 - 優點: 省掉進程間通信的麻煩 - 問題: 同步使用數據的混亂 - 同步 - 解決辦法 - sleep - [輪詢](https://zh.wikipedia.org/wiki/%E8%BC%AA%E8%A9%A2): 效率差 - 互斥鎖 ```python from threading import Thread import time g_num = 0 def test1(): global g_num for i in range(1000000): g_num += 1 print("---test1---g_num=%d"%g_num) def test2(): global g_num for i in range(1000000): g_num += 1 print("---test2---g_num=%d"%g_num) p1 = Thread(target=test1) p1.start() # time.sleep(3) # 取消# 即正常, 待p1加完才加p2 p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num) ``` - 輪詢 ```python import time g_num = 0 g_flag = 1 def test1(): global g_num global g_flag if g_flag == 1: for i in range(1000000): g_num += 1 g_flag = 0 print("---test1---g_num=%d"%g_num) def test2(): global g_num while True: # 輪詢 # 沒有while True的話, if判斷不等於1時就不執行而直接結束, # 故結果會少1000000 if g_flag != 1: for i in range(1000000): g_num += 1 break print("---test2---g_num=%d"%g_num) p1 = Thread(target=test1) p1.start() p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num) # 輪詢缺點: 沒有效率-> while True無意義的佔CPU ``` - 互斥鎖 ```python import time g_num = 0 g_flag = 1 def test1(): global g_num # 上鎖 # acquire(blocking=True, timeout=-1) # blocking: 遇到上鎖的反應; # True: 等 # False: 不等 # timeout: 等待時間; # -1: 永遠; # 2: 2秒 mutex.acquire() for i in range(1000000): g_num += 1 # 解鎖 mutex.release() print("---test1---g_num=%d"%g_num) def test2(): global g_num mutex.acquire() for i in range(1000000): g_num += 1 mutex.release() print("---test2---g_num=%d"%g_num) # 創建一把鎖, 默認未上鎖 mutex = Lock() p1 = Thread(target=test1) p1.start() p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num) # 誰先上鎖無所謂, 先上鎖的就會繼續往下走, 另一個就會等待解鎖才執行上鎖 # 等待者如何知道已解鎖?解鎖後會『通知』 ``` - 多線程使用非全局變量 ```python import threading import time def test(): num = 100 name = threading.current_thread().name # 顯示線程名稱 print("%s"%name) if name == "Thread-1": num+=1 else: time.sleep(1) print("%s,%d"%(name, num)) t1 = threading.Thread(target=test) t1.start() t2 = threading.Thread(target=test) t2.start() # 結果證明線程不共享非全局變量, # 故不必加鎖 ``` ```$ Thread-1 Thread-1,101 Thread-2 Thread-2,100 ``` - 死鎖 - 設置timeout: [看門狗計時器](https://zh.wikipedia.org/wiki/%E7%9C%8B%E9%96%80%E7%8B%97%E8%A8%88%E6%99%82%E5%99%A8) - Threading.lock().acquire(2) - 實例1: - 每隔一段時間給一個變量加一個值, - 如有異常時而timeout時間過後沒收到值, - 此時就啟動重新運行程序 - 實例2: - 網路給予一段時間, - 以避免網速過慢而馬上報錯 - 異步 - 同步 vs 異步: - 確定執行順序: 同步 - 不確定執行順序: 異步 - 堵塞 vs 非堵塞: 等還是不等 ```python from multiprocessing import Pool import time import os def test(): print("pool: pid=%d, ppid=%d"%(os.getpid(),os.getppid())) for i in range(3): print("%d"%i) time.sleep(1) return "hahah" def test2(args): print("callback func: pid=%d"%os.getpid()) print("callback func: args=%s"%args) pool = Pool(3) pool.apply_async(func=test,callback=test2) while True: time.sleep(1) print("mainProcess: pid=%d"%os.getpid()) ``` ```$ pool: pid=5560, ppid=5558 0 1 mainProcess: pid=5558 2 mainProcess: pid=5558 callback func: pid=5558 callback func: args=hahah mainProcess: pid=5558 mainProcess: pid=5558 mainProcess: pid=5558 # mainProcess執行到一半跑去執行callback # mainProcess執行他的回圈, 什麼時候接到callback不知道 # 一接到callback馬上去執行 ``` - [GIL](https://zh.wikipedia.org/wiki/%E5%85%A8%E5%B1%80%E8%A7%A3%E9%87%8A%E5%99%A8%E9%94%81) - GIL保證了多線程中在同時只有一個線程會被調用 - 亦即python的多線程實際上是假的 - 亦即多任務效率: 進程>線程, 因為python鎖住了 - 克服方法: - 用進程 - 用關鍵處用c語言寫 ```python # 一個死循環 while True: pass ``` ```python # 線程死循環 import threading def test(): while True: pass t = threading.Thread(target=test) t.start() while True: pass ``` ```python # 進程死循環 import multiprocessing def test(): while True: pass p = multiprocessing.Process(target=test) p.start() while True: pass # 證明: # 執行後用htop監看cpu # 很明顯進程死循環運用資源比線程高很多 # 而執行一個死循環跟線程死循環的cpu差不多 ``` - 使用C語言寫 `$ vi loop.c` ```c void DeadLoop() { while(1) { ; } } ``` ```$ $ gcc loop.c -shared -o test.so $ ls loop.c test.so ``` `$ vi DeadLoop.py` ```python from ctypes import * from threading import Thread # lib 指向那個編譯過的檔案 lib = cdll.LoadLibrary("./test.so") # 調用檔案裡的函數 t = Thread(target=lib.DeadLoop) t.start() while True: pass ```