owned this note
owned this note
Published
Linked with GitHub
# python3 常用筆記與平行計算
***本篇資料來源為莫煩 python:**
https://morvanzhou.github.io/
- ## python 文件讀寫
w 讀
r 寫
a 附加 (append)
```python=1
my_file = open('myfile.txt','w')
my_file.write("XD")
my_file.close
```
```python=1
file = open('myfile.txt','r')
content = file.read()
content_arr = file.readlines() // 以 array 方式逐行儲存
print(content)
```
目前這樣容易會忘了關檔,這時候用 with 是個不錯的選擇
- ## python tuple & list
``` python=1
a_tuple = (1,2,3,4)
another_tuple = 2,4,6,8,10
a_list = [1,2,3,4,5]
a_list.sort(reverse=True)
print(a_tuple) # (1, 2, 3, 4)
print(another_tuple) # (2, 4, 6, 8, 10)
print(a_list) # [5, 4, 3, 2, 1]
```
- ## python dictionary
```python=1
dic = {'key1':1,'key2':2,'key3':3, 4:'haha'}
dic2 = {'pear':{1:3, 3:'a'}, 'orange':'2017'}
# 沒有順序
print(dic) # {'key3': 3, 'key2': 2, 'key1': 1, 4: 'haha'}
print(dic['key1']) # 1
print(dic[4]) # haha
print(dic2['pear'][3]) # a
# 新增
dic['new'] = 99
# 刪除
del dic['key2']
print(dic) # {'key3': 3, 'key1': 1, 4: 'haha', 'new': 99}
```
- ## python set
利用 set (集合) 來找出所有不重複的元素
```python=1
#coding=utf-8
my_set = ['a','a','a','b','b','c','c','c','c','d','e','ee','e']
# 利用 set 將不重複的種類印出來
print(set(my_set)) # {'d', 'e', 'b', 'a', 'c', 'ee'}
# 使用 set 把一個句子裡面的字元集合找出來
sentence = 'Today is Tuesday, said tao'
print(set(sentence)) # {'d', 'T', 'i', 'y', 'e', 'u', ' ', 't', ',', 'a', 'o', 's'}
# 新增與刪除
unique_char=set(my_set) # {'d', 'e', 'b', 'a', 'c', 'ee'}
unique_char.add('x')
unique_char.remove('a')
print(unique_char) # {'e', 'b', 'c', 'x', 'd', 'ee'}
# 交集與差集
set1 = set(my_set)
set2 = set(sentence)
# 交集:找出 set1 與 set2 都有的元素
print(set1.intersection(set2)) # {'d', 'e', 'a'}
# 差集:找出 set1 裡面有 但是在 set2 裡面沒有的
print(set1.difference(set2)) # {'b', 'c', 'ee'}
```
- ## python import module
```python=1
import time
print(time.time()) # 1492059020.73
print(time.localtime()) # time.struct_time(tm_year=2017, tm_mon=4, tm_mday=13, tm_hour=12, tm_
# min=50, tm_sec=20, tm_wday=3, tm_yday=103, tm_isdst=0)
```
```python=1
from time import time, localtime
print(time()) # 1492059020.73
print(localtime()) # time.struct_time(tm_year=2017, tm_mon=4, tm_mday=13, tm_hour=12, tm_
# min=50, tm_sec=20, tm_wday=3, tm_yday=103, tm_isdst=0)
```
- **import 自己的 module**
寫一個自己腳本 module1.py
```python=1
def method(data):
print(data)
```
在程式裡面 import 剛剛寫的 module 並呼叫 function
main.py
```python=1
import module1
module1.method("Hi Hi ~")
```
:::info
上面的 main.py 跟 module1.py 需要在同一個目錄下才 import 的到。
如果想要在任何地方 import 我們自己寫的 module1 的話,就要把 module1.py 複製到目錄: /Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/site-packages/
相當於使用套件管理工具安裝套件的效果
:::
- ## python 例外處理
```python=1
try:
file = open('ooxx','r')
except Exception as e:
print('no such file')
else:
file.write('XD')
file.close()
```
- ## 使用 pickle 存變數資料與運算結果
- **存資料**
```python=1
#coding=utf-8
import pickle
# 注意:使用 pickle 的時候,檔名不可以命名成 pickle.py
a = [1,3,5,7,9]
d = {'cat':1000, 2:['1','a','XX',3], '3':{'k':'tao', 6:10}}
s = 'my name'
# 存資料
with open('my_file.pickle','wb') as file:
pickle.dump(a, file) # 使用 dump 把 data 倒進去 file 裡面
pickle.dump(d, file)
pickle.dump(s, file)
```
- **讀取資料**
```python=1
#coding=utf-8
import pickle
# 注意:使用 pickle 的時候,檔名不可以命名成 pickle.py
# 拿資料
with open('my_file.pickle','r') as file1:
a = pickle.load(file1) # 逐行拿資料,包括可以拿到 data type
d = pickle.load(file1)
s = pickle.load(file1)
print(a)
print(d)
print(s)
```
- ## 深複製(deep copy)與淺複製(shallow copy)
- **assign 與 copy 的差別**
```python=1
#coding=utf-8
import copy
a = [1,2,3]
b = a # 等號代表 assign
print(id(a)) # 4432640984 記憶體位址
print(id(b)) # 4432640984 記憶體位址
print(id(a)==id(b)) # True
b[0] = 11
print(a) # [11, 2, 3]
a[1] = 22
print(b) # [11, 22, 3]
c = copy.copy(a) # 淺複製
print(id(a)==id(c)) # False
c[1] = 22222
print(a) # [11, 22, 3]
print(c) # [11, 22222, 3]
```
- **assign, copy 與 deepcopy 的差別**
```python=1
#coding=utf-8
import copy
a = [1,2,[3,4]]
b = a # [1,2,[3,4]]
print("*** assign ***") # 指到同一塊記憶體空間
print(id(a)==id(b)) # True
print(id(a[2])==id(b[2])) # True
d = copy.copy(a) # [1,2,[3,4]]
print("*** shallow copy ***" ) # 淺複製 只在淺部分開一塊新的空間
print(id(a)==id(d)) # False
print(id(a[2])==id(d[2])) # True # 深的部分仍然是使用同一塊記憶體
e = copy.deepcopy(a) # [1,2,[3,4]]
print("*** deep copy ***" ) # 深複製 完全開另一塊全新的空間
print(id(a)==id(e)) # False
print(id(a[2])==id(e[2])) # False
# 改變值的實驗 demo
a[0] = 11 # [1, 2, [3, 4]] -> [11, 2, [3, 4]]
print(b) # [11, 2, [3, 4]] 跟著改變
print(d) # [1, 2, [3, 4]] 不受影響
print(e) # [1, 2, [3, 4]] 不受影響
a[2][0] = 333 # [11, 2, [3, 4]] -> [11, 2, [333, 4]]
print(b) # [11, 2, [333, 4]] 跟著改變
print(d) # [1, 2, [333, 4]] 跟著改變(深層的部分)
print(e) # [1, 2, [3, 4]] 不受影響
```
- ## python threading 使用
- **添加thread, 檢視thread, 執行thread**
```python=1
#coding=utf-8
import threading
def thread_job():
# 把目前的 thread 顯示出來看看
print("This is an added Thread, number is {}\n".format(threading.current_thread()))
def main():
# 添加一個 thread
added_thread = threading.Thread(target=thread_job)
# 執行 thread
added_thread.start() # This is an added Thread, number is <Thread(Thread-1, started 123145466363904)>
# 看目前有幾個 thread
print(threading.active_count()) # 2
# 把所有的 thread 顯示出來看看
print(threading.enumerate()) # [<_MainThread(MainThread, started 140736627270592)>, <Thread(Thread-1, started 123145466363904)>]
# 把目前的 thread 顯示出來看看
print(threading.current_thread()) #<_MainThread(MainThread, started 140736627270592)>
if __name__ == '__main__':
main()
```
- **join 的功用與用法**
例1
```python=1
# without join
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
print('all done')
if __name__=='__main__':
main()
```
output:
因為 thread_job 裡面做的事情比較慢,所以T1 finish 在 all done 後面,也就是主線程(main thread)先執行完。
```
T1 start
all done
T1 finish
```
如果主線程要等 T1 做完的話,就要在 print('all done') 前面使用 join()
例1 使用 join():
```python=1
# use join
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
print('all done')
if __name__=='__main__':
main()
```
output:
```
T1 start
T1 finish
all done
```
例2:
我們新增另一個 thread T2 他要做 T2_job
```python=1
# without join
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def T2_job():
print('T2 start')
time.sleep(0.1)
print('T2 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
thread2 = threading.Thread(target=T2_job, name='T2')
thread2.start()
print('all done')
if __name__=='__main__':
main()
```
output:
```
T1 start
T2 start
all done
T2 finish
T1 finish
```
例2 使用 join:
```python=1
# without join
import threading
import time
def thread_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish')
def T2_job():
print('T2 start')
time.sleep(0.1)
print('T2 finish')
def main():
thread1 = threading.Thread(target=thread_job, name='T1')
thread1.start()
thread2 = threading.Thread(target=T2_job, name='T2')
thread2.start()
thread2.join()
thread1.join()
print('all done')
if __name__=='__main__':
main()
```
output:
```
T1 start
T2 start
T2 finish
T1 finish
all done
```
- **使用 queue 來接收 thread 執行後回傳的資料**
目前我們的 thread 所做的事情是沒有回傳值的
如果要讓 thread 運算執行後回傳資料的話,則需要使用 queue 來接。
threading03_queue.py
```python=1
#coding=utf-8
import threading
import time
from queue import Queue
def thread_job(arr, q):
for i in range(len(arr)):
arr[i] = arr[i]**2
q.put(arr) # 將結果放進 queue
# return arr # 上面這一步驟取代了 return
def multithreading():
q = Queue() # 宣告 Queue 物件
threads = [] # 用來放 thread 的 array
data = [[1,2,3],[4,5,6],[7,7,7],[5,5,5]]
for i in range(4):
t = threading.Thread(target=thread_job, args=(data[i], q)) # 將 data 與 queue 傳入 thread 裡面
t.start()
threads.append(t)
for thread in threads:
thread.join() # 每個 thread 都要做 join
results = [] # 用來接收與顯示結果的 array
for _ in range(4):
results.append(q.get()) # 取出 queue 裡面的資料
print(results) # 顯示執行後的結果
if __name__=='__main__':
multithreading()
'''
執行:python3 threading03_queue.py
output:
[[1, 4, 9], [16, 25, 36], [49, 49, 49], [25, 25, 25]]
'''
```
- **lock的用法**
當我們執行多線程時,如果想要等一個線程做完再做其他線程
就必須要使用 lock 先鎖住 thread 然後做完事情後再 release
```python=1
import threading
def job1():
global A, lock
lock.acquire()
for i in range(10):
A += 1
print('job1',A)
lock.release()
def job2():
global A, lock
lock.acquire()
for i in range(10):
A += 10
print('job2',A)
lock.release()
if __name__=='__main__':
lock = threading.Lock()
A = 0
t1 = threading.Thread(target=job1)
t2 = threading.Thread(target=job2)
t1.start()
t2.start()
t1.join()
t2.join()
```
output:
```
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110
```
- ## python multiprocess
- **基本用法跟Threading很像**
```python=1
#coding=utf-8
import multiprocessing as mp
import threading as td
# 使用 multiprocessing 跟使用 threading 的方法非常相似
def job(a,b):
print('doing job')
# 宣告
t1 = td.Thread(target=job, args=(1,2))
p1 = mp.Process(target=job, args=(1,2))
# 開始
t1.start()
p1.start()
# join
t1.join()
p1.join()
```
**須在 main 裡面呼叫 multiprocess**
```python=1
#coding=utf-8
import multiprocessing as mp
def job(a,b):
print('doing job')
# 使用 multiprocessing 要在 main 裡面呼叫
if __name__=='__main__':
p1 = mp.Process(target=job, args=(1,2))
p1.start()
p1.join()
```
- **使用Queue接function的回傳值**
```python=1
#coding=utf-8
import multiprocessing as mp
def job(q):
result = 0
for i in range(10):
result += i
q.put(result)
# 使用 multiprocessing 須在 main 裡面用
if __name__=='__main__':
q = mp.Queue() # 使用 queue 接收 function 的回傳值
p1 = mp.Process(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1) # 45
print(res2) # 45
print(res1+res2) # 90
```
:::info
process 跟 process 之間彼此是獨立工作的,並不會共用同一份資料,所以 process 之間可以確保可平行化處理,跟 Thread 不同,Thread 有 shared data (可以直接共用 global 變數),且會受 GIL(Global Interpreter Lock) 的保護影響,執行起來不一定能平行化
GIL簡介:
http://blog.ephrain.net/python-python-gil-%E7%9A%84%E5%95%8F%E9%A1%8C/
:::
- **一般情形,multi-Thread 與 multi-Process 效能比較**
```python=1
#coding=utf-8
import multiprocessing as mp
import threading as td
import time
def job(q):
result = 0
for i in range(100000):
result += i+i**2+i**3
q.put(result)
# 使用 multi-process
def multicore():
q = mp.Queue() # 使用 queue 接收 function 的回傳值
p1 = mp.Process(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:',res1+res2)
# 使用 multi-thread
def multithread():
q = mp.Queue() # multiprocess 的 queue 可以用在 thread
t1 = td.Thread(target=job, args=(q,)) # 特別注意 這邊的傳入參數只有一個的話,後面要有逗號
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:',res1+res2)
def normal():
result = 0
# 因為上面都是 2 個 thread 或 2 個 process (core)
# 所以要對比一般狀況的效能,要做兩次才公平
for _ in range(2):
for i in range(100000):
result += i+i**2+i**3
print('normal:',result)
if __name__=='__main__':
time0 = time.time()
normal()
time1 = time.time()
print('normal time:', time1 - time0)
multithread()
time2 = time.time()
print('multithread time:', time2 - time1)
multicore()
time3 = time.time()
print('multicore time:', time3 - time2)
''' 結果
('normal:', 49999666671666600000L)
('normal time:', 0.03933405876159668)
('multithread:', 49999666671666600000L)
('multithread time:', 0.06222200393676758)
('multicore:', 49999666671666600000L)
('multicore time:', 0.025996923446655273)
'''
```
根據以上實驗結果,我們發現 multi-process (multicore) 最有效率,所需時間最短。
multi-thread 因為會受 GIL 影響,在某些 case 之下效率未必能比普通狀況還要好,所以要使用 multi-thread 來提升程式運算效能,要看任務的性質找適當的時機使用。
- **使用Process Pool自動分配多核心,接收function回傳值**
```python=1
#coding=utf-8
import multiprocessing as mp
def job(x):
return x*x
def multicore():
# 使用 Pool 自動分配給 CPU 的每個一核心 (core)
pool = mp.Pool()
result = pool.map(job, range(10)) # 使用 pool 還可以接到 function 的回傳值
print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 使用 Pool 指定分配給 CPU 的 3 核心 (core)
pool = mp.Pool(processes=3)
result = pool.map(job, range(10)) # 使用 pool 還可以接到 function 的回傳值
print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 除了 map 功能以外,還有另一個功能: apply_async
# apply_async 一次只能在一個核心中算一個東西
res = pool.apply_async(job,(2,)) # 這個只能把一個值放在一個核心運算一次
print(res.get()) # 4
# 如果要輸入一串的話,可以用迭代的方法搭配 apply_async,達到多核心計算的效能
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res]) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
if __name__=='__main__':
multicore()
```
- **Shared Memory共享內存**
```python=1
import multiprocessing as mp
# 宣告共用變數
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
# 宣告共用陣列
array = mp.Array('i', [1, 2, 3, 4])
```
:::warning
multiprocess 的共享 Array 只能是一維
:::
- **Process Lock**
以下是一個兩個 process 使用同一個 shared memory 的例子,先觀察一下如果沒有 lock 機制的情形:
```python=1
#coding=utf-8
import multiprocessing as mp
import time
def job(v, num):
for _ in range(10):
time.sleep(0.1)
v.value += num # 使用共享資料取值要用 value
print(v.value)
def multicode():
v = mp.Value("i",0) # 宣告一個 process 之間共享的變數
p1 = mp.Process(target=job, args=(v,1)) # 把 v 傳值進去
p2 = mp.Process(target=job, args=(v,3))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicode()
```
目前的 output
```
1
4
5
8
9
12
13
16
19
19
20
23
24
24
25
25
26
26
27
27
```
可以看出兩個 process 是平行處理的,所以會隨機搶奪 shared 的資料去執行 function
**加上 Lock**
用法跟之前的 Thread Lock 很像
```python=1
#coding=utf-8
import multiprocessing as mp
import time
def job(v, num, loc):
loc.acquire()
for _ in range(10):
time.sleep(0.1)
v.value += num # 使用共享資料取值要用 value
print(v.value)
loc.release()
def multicode():
loc = mp.Lock() # 宣告一個 Lock
v = mp.Value("i",0) # 宣告一個 process 之間共享的變數
p1 = mp.Process(target=job, args=(v,1, loc)) # 把 v 傳值進去
p2 = mp.Process(target=job, args=(v,3, loc))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicode()
```
output:
```
1
2
3
4
5
6
7
8
9
10
13
16
19
22
25
28
31
34
37
40
```
有了 Lock 的保護機制,這次的 output 就很整齊,依序先做完 p1 做的事 (逐步加一) 再做 p2 做的事 (逐步加三)
## 附錄
### 使用ffmpeg 將 wav 轉成 mp3
使用指令
```
ffmpeg -i input.wav -vn -ar 44100 -ac 2 -ab 192k -f mp3 output.mp3
```
但是指令只能一次一個檔案轉檔,我有一整個資料夾的wav檔都要轉,那就...
動手寫一個 converWavToMp3.py 吧 ~
```python=1
#coding=utf-8
import os
import subprocess
# 走訪現在所在的目錄
for dirPath, dirNames, fileName in os.walk("./"):
for file_name in fileName:
if file_name.endswith('.wav') or file_name.endswith('.WAV'):
print file_name
subprocess.call('ffmpeg -i {} -vn -ar 44100 -ac 2 -ab 192k -f mp3 {}.mp3'.format(file_name,file_name[:-4]),shell=True)
```
## 使用 multiprocess.Pool() 做多參數傳遞的函式
當我想在多核心運算的 function 的傳入參數不只一個的時候,或是傳入參數比較複雜的時候,需要一些 iteration tool 來幫忙
### zip 的功用
幫你把兩個 list 的參數一對一對應!
```python=1
#coding=utf-8
if __name__=='__main__':
a = [1,2]
b = [3,4]
# 這樣寫只能印出物件的記憶體位置
print(zip(a,b)) # <zip object at 0x1026ad148>
# 這樣寫才能看到內容
print(list(zip(a,b))) # [(1, 3), (2, 4)]
for i,j in zip(a,b):
print(i,j)
'''output:
1 3
2 4
'''
# 當 a, b 數量一樣的時候 zip 會幫你一對一對起來
```
### repeat 的功用
協助把一個有多個數值的 list 與單一參數做多對一對應!
自動把單一個參數 "repeat" 多次,以至與 list 得數量相同
```python=1
#coding=utf-8
from itertools import repeat
if __name__=='__main__':
# 但是當 a, b 數量不同的時候怎麼辦呢?
# 例如 a 是個 list, b 只是一個數字
a = [1,2,3]
b = 4
# 使用迭代工具 repeat 來自動幫 b 值做 repeat
print(list(zip(a,repeat(b)))) # [(1, 4), (2, 4), (3, 4)]
# 又或者我們要把 a,b,c 三個參數組合,拿去跟一個從零開始逐步加一成長中的數字一起迭代計算
a = 1
b = 2
c = [3.14,2.7183,0.009]
for i,j,k,l in zip(repeat(a),repeat(b),repeat(c),range(3)):
print(i,j,k,l)
'''output
1 2 [3.14, 2.7183, 0.009] 0
1 2 [3.14, 2.7183, 0.009] 1
1 2 [3.14, 2.7183, 0.009] 2
'''
p = 1
q = 2
m = 3
N = [0.2,0.3,0.4,0.5]
# 使用 zip 搭配 repeat 工具可以將所有參數組合好 目前 range(len(N)) 的部分是想要 iteration 的重點項目
print('zip:',list(zip(repeat(p),repeat(q),repeat(m),range(len(N)),repeat(N))))
''' 印出的 zip 長這樣:
zip: [(1, 2, 3, 0, [0.2, 0.3, 0.4, 0.5]), (1, 2, 3, 1, [0.2, 0.3, 0.4, 0.5]),
(1, 2, 3, 2, [0.2, 0.3, 0.4, 0.5]), (1, 2, 3, 3, [0.2, 0.3, 0.4, 0.5])]
'''
```
### 把 zip 與 repeat 搭配使用於 Pool 當中可以做到多參數傳遞
```python=1
#coding=utf-8
import multiprocessing as mp
from itertools import repeat
import numpy as np
# 我們想要在 multiprocess 中執行以下多參數的 function
def count_error(p,q,m,t,N):
print('in func:',p,q,m,t,N)
return p + N[t]
if __name__=='__main__':
# 使用 pool
pool = mp.Pool()
# 使用 starmap 加上 zip 可以支援多參數傳遞,並可以接到 function 的 reutrn 值
res = pool.starmap(count_error, zip(repeat(p),repeat(q),repeat(m),range(len(N)),repeat(N)))
''' 執行 count_error 的過程
in func: 1 2 3 0 [0.2, 0.3, 0.4, 0.5]
in func: 1 2 3 1 [0.2, 0.3, 0.4, 0.5]
in func: 1 2 3 2 [0.2, 0.3, 0.4, 0.5]
in func: 1 2 3 3 [0.2, 0.3, 0.4, 0.5]
'''
print('result:',res) # result: [1.2, 1.3, 1.4, 1.5]
print('sum = ',sum(res)) # sum = 5.4
```
- ## python yield
```python=1
def yield_func():
a = 3
b = 2
yield a
c = 4
yield c
yield b
b = 5
yield b
generator = yield_func()
print generator.next() # 3 (a=3)
print generator.next() # 4 (c=4)
print generator.next() # 2 (b=2)
print generator.next() # 5 (b=5)
```
```python=1
def return_func():
a = 3
b = 2
return a
c = 4
return c
return b
b = 5
return b
print return_func() # 3 (a=3)
print return_func() # 3 (a=3)
print return_func() # 3 (a=3)
print return_func() # 3 (a=3)
```