Python多任務
===
KeyWord
- 時間片輪轉
- 優先級調度
- 調度算法
- 併行v.s.併發
進程
---
- 進程 vs 程序
- 運行代碼前-> 程序
- 運行代碼時-> 進程
- fork()
- Win 無法使用
```python
import os
ret = os.fork()
print("haha")
```
```
haha
haha
# 兩個?
```
```python
import os
import time
ret = os.fork()
if ret == 0:
while True:
print("--1--")
time.sleep(1)
else:
while True:
print("--2--")
time.sleep(1)
# 同時出現--1-- & --2--
# 說明:
# 執行程序時會有個主進程開始執行程序
# 執行至os.fork()時, os會創建另一個進程(子進程), 並複製主(父)進程至子中
# 接著父子進程皆會得到返回值, 子進程必為0, 父進程是子進程的pid
# 接著同時往下判斷, 子進程進入--1--迴圈, 父進程進入--2--迴圈
```
```$
--2--
--1--
--1--
--2--
--1--
--2--
--2--
--1--
# 問題. 父進程先還是子進程先?
# 不一定, 視os算法而定
```
- getpid()、getppid()
```python
import os
ret = os.fork()
print(ret)
if ret>0:
print("父進程:%d"%os.getpid())
else:
print("子進程:%d,%d"%(os.getpid(), os.getppid()))
# pid 查看
# $ ps aux
# ps: Process Status
# a: all
# u: user
# x: processes w/o controlling ttys
# getppid: parent process ID
# pid 知識:
# 最大值65535(2^16-1)-> 最大運行66535個進程
# 進程號不允許相同
```
```$
7841
父進程:7840
0
子進程:7841,7840
# 此結果說明主進程創建子進程時, 便會得到子進程的pid
# 而7840創建了7841
```
- 進程先後順序
```python
import os
import time
ret = os.fork()
if ret ==0:
print("子進程")
#time.sleep(1)
time.sleep(5)
print("子進程over")
else:
print("父進程")
time.sleep(3)
print("over")
# $ 以主進程為依據, 主進程結束即跳出, 不會待子進程結束才結束
# 主進程與子進程各自會執行向下的所有代碼, 故over出現了兩次
```
```$
父進程
子進程
over # 3sec後出現
$ 子進程over # $(3sec後出現); 子進程over(5sec後出現)
over # 5sec後出現
# print預設下一行, 如不要下一行:print("",end="")
```
- 進程間數據不共享
```python
import os
import time
num = 100
ret = os.fork()
if ret ==0:
print("1")
num +=1
print("1, num=%d"%num)
else:
time.sleep(3)
print("2")
print("2, num=%d"%num)
```
```$
1
1, num=101
2
2, num=100
```
- 進程間的通信
- 同一台電腦
- PIPE & FIFO
- signal
- 消息對列
- 共享內存
- 信號量
- socket
- ...
- 不同電腦: 網路
-多個fork
```python
import os
# 父進程
ret = os.fork()
if ret ==0:
# 子進程
print("1")
else:
# 父進程
print("2")
# 父子進程
ret = os.fork()
if ret ==0:
# 孫進程
# 2兒子進程
print("11")
else:
# 父進程
# 子進程
print("22")
```
```$
2
22
1
11
22
11
```
```python
import os
# 父進程
ret = os.fork()
if ret ==0:
# 子進程
print("1")
else:
# 父進程
print("2")
# 父進程
ret = os.fork()
if ret ==0:
# 二兒子進程
print("11")
else:
# 父進程
print("22")
```
```$
2
22
1
11
```
- 三個fork
```python
import os
os.fork()
os.fork()
os.fork()
print("1")
```
```$
1
1
1
1
1
1
1
1
```
- fork炸彈
```python
import os
while Ture:
os.fork()wwww
```
- multiprocessing
- 由於fork在windows無法調用
- multiprocessing就是跨平台module
```python
from multiprocessing import Process
import time
def test():
while True:
print("test")
time.sleep(1)
p = Process(target=test) # 指定進程執行test
p.start # 讓該進程開始執行
while True:
print("main")
time.sleep(1)
```
```$
main
test
main
test
test
main
test
main
...
```
```python
# 主進程先結束
from multiprocessing import Process
import time
def test():
for i in range(5):
print("test")
time.sleep(1)
p = Process(target=test)
p.start() # 讓這個進程開始執行test函數的code
# 在fork中, 不論childProcess有無執行結束, mainProcess結束即跳出$
# 在Process中, mainProcess結束後, 會待childProcess結束才結束
```
- 補充:
- [殭屍進程](https://zh.wikipedia.org/wiki/%E5%83%B5%E5%B0%B8%E8%BF%9B%E7%A8%8B):
- 如子進程結束後, 父進程沒將子進程佔用的內存收回,
- 在內存尚未被處理期間, 子進程稱為殭屍進程
- [孤兒進程](https://zh.wikipedia.org/wiki/%E5%AD%A4%E5%84%BF%E8%BF%9B%E7%A8%8B):
- 如子進程尚未結束而父進程結束了,
- 此時子進程稱為孤兒進程
- 誰處理孤兒進程?
- PID 0: 切換進程佔用CPU(負責切換任務)
- PID 1: 直接/間接創造所有子進程(負責生產小弟)
- 孤兒進程即由PID 1 處理
```$
test
test
test
test
test
$
```
```python
# 給target傳參數
# Process([group [, target [, name [, args [, kwargs]]]]])
from multiprocessing import Process
import os
def test(num):
print("child: pid=%d,ppid=%d,num=%d"%(os.getpid(), os.getppid(), num))
p = Process(target=test, args=(100,))
p.start()
print("main: pid=%d"%os.getpid())
```
```python
main: pid=2665
child: pid=2666,ppid=2665,num=100
```
```python
# join前情提要
# 欲使mainProcess在P執行完後執行
# 惟P執行是隨機秒數
from multiprocessing import Process
import time
import random
def test():
for i in range(random.randint(1,5)): # random.randint 回傳一個隨機整數
print("%d"%i)
time.sleep(1)
p = Process(target=test)
p.start()
# time.sleep(2) 永遠無法預測P幾秒後結束
# join: 待進程實例結束後或等待多少秒
# join([timeout])
# 此方法稱為『堵塞』, 亦即待某條件發生才執行
# 該條件發生時稱為『解堵塞』
# timeout為等待的最長時間, 時間到就繼續執行
p.join()
print("main")
```
```$
$ time.sleep(2)
0
1
2
main
3
4
$ time.sleep(2)
0
1
main
2
3
$ p.join()
0
1
2
3
4
main
$ p.join()
0
1
main
```
- Process子類
- 有時Process並非只單純執行一個函數
- 而需要很多複雜的內容時
- 此時可以實例化一個類,並繼承Process
- 講義補充說明
- time.time: 格林威治時間
- 作用: 判斷運行時間->判斷效率
- 彙編>C>Python>Ruby
- time.ctime: 現在時間
```python
from multiprocessing import Process
import time
class NewProcess(Process):
def run(self):
while True:
print("1")
time.sleep(1)
p = NewProcess()
p.start()
while True:
print("2")
time.sleep(1)
# Q. p = Process(target=test), 創建對象時需要指定函數名,
# 那class沒有指定函數, 該Process要執行甚麼?
# A. start()會調用run(), 故此時重寫run()即可執行欲執行
#
# RV. 單例模式、簡單工廠方法
``
```$
2
1
1
2
...
```
- 進程池Pool
- 先創建一堆進程等待使用, 類似於Ruby的運作模式
- mainProcess通常用來等待,任務都childPorcess執行
```python
from multiprocessing import Pool
import os
import time
def worker(num):
for i in range(3):
print("pid:%d, num=%d"%(os.getpid(), num))
time.sleep(1)
pool = Pool(3) # 定義進程池, 最大3個進程數
for i in range(5):
print("%d"%i)
# Pool.apply_async(調用目標,(參數元祖,))
# 向進程池添加任務
pool.apply_async(worker, (i,))
# Q. 只有3個進程數要處理5個任務?
# A. 會依序進程處理, 第一批處理三個, 完成後補上下個任務
pool.close() # 關閉進程池, 不再接收新任務
pool.join() # mainProcess不會待childProcess結束才結束,
# 如沒設join, 會導致任務池還沒執行即關閉程序
# ps. 不一定Pool設定的最大進程數越大越好,
# 須經由壓力測試, 平衡出最適值
```
```$
0
1
2
3
4
pid:14861, num=0
pid:14862, num=1
pid:14863, num=2
pid:14861, num=0
pid:14862, num=1
pid:14863, num=2
pid:14862, num=1
pid:14861, num=0
pid:14863, num=2
pid:14861, num=3
pid:14862, num=4
pid:14861, num=3
pid:14862, num=4
pid:14861, num=3
pid:14862, num=4
```
- apply 堵塞式
```python
from multiprocessing import Pool
import os
import time
def worker(num):
for i in range(3):
print("pid:%d, num=%d"%(os.getpid(), num))
time.sleep(1)
pool = Pool(3)
for i in range(5):
print("%d"%i)
pool.apply(worker, (i,)) # 堵塞
# 完成此任務才繼續下個任務
pool.close()
pool.join()
```
```$
0
pid:15837, num=0
pid:15837, num=0
pid:15837, num=0
1
pid:15838, num=1
pid:15838, num=1
pid:15838, num=1
2
pid:15839, num=2
pid:15839, num=2
pid:15839, num=2
3
pid:15837, num=3
pid:15837, num=3
pid:15837, num=3
4
pid:15838, num=4
pid:15838, num=4
pid:15838, num=4
```
進程間通信
---
- Queue
```python
In [1]: from multiprocessing import Queue
In [2]: q = Queue(3)
In [3]: q.qsize()
NotImplementedError:
# Queue.put(obj, block=True, timeout=None)
# 寫入隊列,類型不拘
In [4]: q.put("GodJJ")
In [6]: q.put([])
In [7]: q.put({})
In [8]: q.put(100)
^C # 空間不夠時,就會堵塞
# nowait 不等待, 直接異常, 通常放在try裡
In [22]: q.put_nowait("wetDream")
Full:
In [9]: q.get()
Out[9]: 'GodJJ'
In [10]: q.get()
Out[10]: 'DaGG'
In [11]: q.get()
Out[11]: 'Toyz'
In [12]: q.get()
^C # 沒東西取了, 亦發生堵塞
In [18]: q.get_nowait()
Empty:
# empty: 看隊列是否為空
In [13]: q.empty()
Out[13]: True
# full: 看隊列是否為滿
In [15]: q.full()
Out[15]: False
```
- ProcesssPool間的Queue
- multiprocessing.Manager().Queue()
- 多進程copyFile
```python
from multiprocessing import Pool, Manager
import os
def copyFile(name, oldFloderName, newFloderName, queue):
#fr = open(name)
#fw = open(name, w)
# 此時打開的是文件夾裡面的檔案名,
# 惟當前路徑下只有文件夾的名字, 故會打不開
# 此時應該是open("./test/"+name),
# 但不應該將路徑寫死, 故把路徑傳進來
fr = open(oldFloderName+"/"+name)
fw = open(newFloderName+"/"+name, "w")
content = fr.read()
fw.write(content)
fr.close()
fw.close()
queue.put(name)
def main():
# 0.獲取要copy的文件夾名
oldFloderName = input("請輸入文件名: ")
# 1.創建文件夾
newFloderName = oldFloderName + "-複製"
#print(newFloderName)
os.mkdir(newFloderName)
# 2.獲取文件夾中所有文件名
fileNames = os.listdir(oldFloderName)
#print(fileNames)
# 3.多進程copy
po = Pool(3)
queue = Manager().Queue()
# apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
# po.apply.async(copyFile, args=(fileName))
# 將整個列表傳進去複製, 此時僅吩咐一個任務進去執行,而非多任務
for name in fileNames:
po.apply_async(copyFile, args=(name, oldFloderName, newFloderName, queue))
# 查看進度
num = 0
allNum = len(fileNames)
while True:
queue.get()
num+=1
copyRate = num/allNum
print("\rcopy的進度: %.2f%%"%(copyRate*100), end="")
# \r:歸位; .2f:浮點數兩位; %%:%
if num==allNum:
break
print("\ncomplete") # \n: 換行
if __name__ == "__main__":
main()
```$
$ python3 xxx.py
請輸入文件名: test
copy的進度: 100.00%
complete
$ ls
xxx.py test-複製 test
$ cat test/1.py
a=100
$ cat test-複製/1.py
a=100
```
- 進程理論上是直接copy一份到子進程, 惟如此會浪費空間, 故os會採用[cow](https://zh.wikipedia.org/wiki/%E5%AF%AB%E5%85%A5%E6%99%82%E8%A4%87%E8%A3%BD), 必要時才拷貝
線程
---
- 每一進程有一個箭頭, 該箭頭就稱為主線程
- 進程是資源分配的單位, 線程是CPU調度的單位
```python
# 單線程執行
import time
def test():
print("1")
time.sleep(1)
print(time.time())
for i in range(5):
test()
print(time.time())
```
```$
1559283205.543259
1
1
1
1
1
1559283210.558901
```
```python
# 多線程執行
from threading import Thread
import time
def test():
print("1")
time.sleep(1)
print(time.time())
for i in range(5):
t = Thread(target = test)
t.start()
print(time.time())
```
```
1559283265.34059
1
1
1
1
1
1559283265.3414762
```
```python
# 另種方式
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm "+self.name+' @ '+str(i)
#name属性中保存的是当前线程的名字
#線程號TID, 進程號PID
print(msg)
if __name__ == '__main__':
t = MyThread()
t.start()
```
```$
I'm Thread-1 @ 0
I'm Thread-1 @ 1
I'm Thread-1 @ 2
```
- 線程執行順序
```python
import threading
import time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I'm "+self.name+' @ '+str(i)
print(msg)
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == '__main__':
test()
```
```$
I'm Thread-1 @ 0
I'm Thread-2 @ 0
I'm Thread-3 @ 0
I'm Thread-5 @ 0
I'm Thread-4 @ 0
I'm Thread-5 @ 1
I'm Thread-3 @ 1
I'm Thread-1 @ 1
I'm Thread-2 @ 1
I'm Thread-4 @ 1
I'm Thread-5 @ 2
I'm Thread-1 @ 2
I'm Thread-3 @ 2
I'm Thread-4 @ 2
I'm Thread-2 @ 2
# 每次順序都不同, 依據os的調用方法
```
- 線程共享全局變量
- 優點: 省掉進程間通信的麻煩
- 問題: 同步使用數據的混亂
- 同步
- 解決辦法
- sleep
- [輪詢](https://zh.wikipedia.org/wiki/%E8%BC%AA%E8%A9%A2): 效率差
- 互斥鎖
```python
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
# time.sleep(3) # 取消# 即正常, 待p1加完才加p2
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
```
- 輪詢
```python
import time
g_num = 0
g_flag = 1
def test1():
global g_num
global g_flag
if g_flag == 1:
for i in range(1000000):
g_num += 1
g_flag = 0
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
while True: # 輪詢
# 沒有while True的話, if判斷不等於1時就不執行而直接結束,
# 故結果會少1000000
if g_flag != 1:
for i in range(1000000):
g_num += 1
break
print("---test2---g_num=%d"%g_num)
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 輪詢缺點: 沒有效率-> while True無意義的佔CPU
```
- 互斥鎖
```python
import time
g_num = 0
g_flag = 1
def test1():
global g_num
# 上鎖
# acquire(blocking=True, timeout=-1)
# blocking: 遇到上鎖的反應;
# True: 等
# False: 不等
# timeout: 等待時間;
# -1: 永遠;
# 2: 2秒
mutex.acquire()
for i in range(1000000):
g_num += 1
# 解鎖
mutex.release()
print("---test1---g_num=%d"%g_num)
def test2():
global g_num
mutex.acquire()
for i in range(1000000):
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
# 創建一把鎖, 默認未上鎖
mutex = Lock()
p1 = Thread(target=test1)
p1.start()
p2 = Thread(target=test2)
p2.start()
print("---g_num=%d---"%g_num)
# 誰先上鎖無所謂, 先上鎖的就會繼續往下走, 另一個就會等待解鎖才執行上鎖
# 等待者如何知道已解鎖?解鎖後會『通知』
```
- 多線程使用非全局變量
```python
import threading
import time
def test():
num = 100
name = threading.current_thread().name # 顯示線程名稱
print("%s"%name)
if name == "Thread-1":
num+=1
else:
time.sleep(1)
print("%s,%d"%(name, num))
t1 = threading.Thread(target=test)
t1.start()
t2 = threading.Thread(target=test)
t2.start()
# 結果證明線程不共享非全局變量,
# 故不必加鎖
```
```$
Thread-1
Thread-1,101
Thread-2
Thread-2,100
```
- 死鎖
- 設置timeout: [看門狗計時器](https://zh.wikipedia.org/wiki/%E7%9C%8B%E9%96%80%E7%8B%97%E8%A8%88%E6%99%82%E5%99%A8)
- Threading.lock().acquire(2)
- 實例1:
- 每隔一段時間給一個變量加一個值,
- 如有異常時而timeout時間過後沒收到值,
- 此時就啟動重新運行程序
- 實例2:
- 網路給予一段時間,
- 以避免網速過慢而馬上報錯
- 異步
- 同步 vs 異步:
- 確定執行順序: 同步
- 不確定執行順序: 異步
- 堵塞 vs 非堵塞: 等還是不等
```python
from multiprocessing import Pool
import time
import os
def test():
print("pool: pid=%d, ppid=%d"%(os.getpid(),os.getppid()))
for i in range(3):
print("%d"%i)
time.sleep(1)
return "hahah"
def test2(args):
print("callback func: pid=%d"%os.getpid())
print("callback func: args=%s"%args)
pool = Pool(3)
pool.apply_async(func=test,callback=test2)
while True:
time.sleep(1)
print("mainProcess: pid=%d"%os.getpid())
```
```$
pool: pid=5560, ppid=5558
0
1
mainProcess: pid=5558
2
mainProcess: pid=5558
callback func: pid=5558
callback func: args=hahah
mainProcess: pid=5558
mainProcess: pid=5558
mainProcess: pid=5558
# mainProcess執行到一半跑去執行callback
# mainProcess執行他的回圈, 什麼時候接到callback不知道
# 一接到callback馬上去執行
```
- [GIL](https://zh.wikipedia.org/wiki/%E5%85%A8%E5%B1%80%E8%A7%A3%E9%87%8A%E5%99%A8%E9%94%81)
- GIL保證了多線程中在同時只有一個線程會被調用
- 亦即python的多線程實際上是假的
- 亦即多任務效率: 進程>線程, 因為python鎖住了
- 克服方法:
- 用進程
- 用關鍵處用c語言寫
```python
# 一個死循環
while True:
pass
```
```python
# 線程死循環
import threading
def test():
while True:
pass
t = threading.Thread(target=test)
t.start()
while True:
pass
```
```python
# 進程死循環
import multiprocessing
def test():
while True:
pass
p = multiprocessing.Process(target=test)
p.start()
while True:
pass
# 證明:
# 執行後用htop監看cpu
# 很明顯進程死循環運用資源比線程高很多
# 而執行一個死循環跟線程死循環的cpu差不多
```
- 使用C語言寫
`$ vi loop.c`
```c
void DeadLoop()
{
while(1)
{
;
}
}
```
```$
$ gcc loop.c -shared -o test.so
$ ls
loop.c test.so
```
`$ vi DeadLoop.py`
```python
from ctypes import *
from threading import Thread
# lib 指向那個編譯過的檔案
lib = cdll.LoadLibrary("./test.so")
# 調用檔案裡的函數
t = Thread(target=lib.DeadLoop)
t.start()
while True:
pass
```