Python / concurrent
===
###### tags: `Python`
###### tags: `Python`, `concurrent`, `非同步`, `平行, `並行, `併發, `多工`, `futures`, `submit`, `wait`, `asyncio`, `async`, `await`
<br>
[TOC]
<br>
## 參考資料
- [concurrent.futures — 創立非同步任務 — 你所不知道的 Python 標準函式庫用法 06](https://blog.louie.lu/2017/08/01/你所不知道的-python-標準函式庫用法-06-concurrent-futures/)
- [Python Asyncio 協程(二)](https://jimmy-huang.medium.com/e717018bb984)
<br>
## 討論
### 並行發送 request
- ### Question
我正在使用 python 的 requests 套件
我需要發很多 request ,且需要加速執行,以縮短時間
```python
import requests
api_url = "http://example"
data_list = [...]
for i in range(0, 100, 5):
for j in range(5)
data = data_list[i + j]
requests.post(api_url, headers=headers, json=data)
```
我要如何一次並行打 5 個 request?
- ### ChatGPT-4o-2024-08-06
要並行發送多個請求,你可以使用 Python 的 `concurrent.futures` 模組。這個模組提供了一個簡單的方法來使用多線程或多進程,這樣你可以同時發送多個請求。
以下是使用 `concurrent.futures.ThreadPoolExecutor` 的一個範例,來同時發送 5 個請求:
```python=
import requests
import concurrent.futures
api_url = "http://example"
data_list = [...]
headers = {
# 在這裡設定你的 headers
}
def send_request(data):
response = requests.post(api_url, headers=headers, json=data)
return response.status_code, response.json() # 可以根據需要返回更多信息
# 使用 ThreadPoolExecutor 來並行處理請求
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(send_request, data_list[i + j]) for i in range(0, 100, 5) for j in range(5)]
for future in concurrent.futures.as_completed(futures):
try:
status_code, response_json = future.result()
print(f"Status code: {status_code}, Response: {response_json}")
except Exception as e:
print(f"Request failed: {e}")
```
### 說明:
- `ThreadPoolExecutor`:這個工具允許你創建一個線程池,並在其中並行執行任務。這裡 `max_workers=5` 表示最多允許 5 個線程同時運行。
- `executor.submit()`:這個方法將一個函數和它的參數提交給線程池執行。它返回一個 `Future` 對象,該對象可以用來檢查任務的狀態或獲取結果。
- `concurrent.futures.as_completed()`:這個函數可以幫助你以完成順序迭代所有 `Future` 對象。
這樣做,你就可以一次並行發送 5 個請求,大大提高執行速度。
<br>
### 並行發送 request
- ### Question
```
for future in concurrent.futures.as\_completed(futures):
```
回傳的順序,是跟原本發送的順序一樣嗎?
- ### ChatGPT-4o-2024-08-06
使用 `concurrent.futures.as_completed(futures)` 時,返回的結果順序並不會與原本發送的順序一致。`as_completed` 會根據每個 `Future` 對象完成的時間順序來返回結果。也就是說,先完成的請求會先被迭代到,並不保證與提交請求的順序一致。
如果你需要保持返回結果的順序與發送順序一致,可以考慮以下幾種方法:
1. **使用 `Future` 的索引來排序結果*: 你可以在提交請求時附帶一個索引,然後在處理結果時根據這個索引排序。
```python=
import requests
import concurrent.futures
api_url = "http://example"
data_list = [...]
headers = {
# 在這裡設定你的 headers
}
def send_request(data):
response = requests.post(api_url, headers=headers, json=data)
return response.status_code, response.json() # 可以根據需要返回更多信息
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(send_request, data_list[i + j]): i + j for i in range(0, 100, 5) for j in range(5)}
results = [None] * len(futures) # 創建一個與 futures 數量相同的列表來存儲結果
for future in concurrent.futures.as_completed(futures):
index = futures[future]
try:
status_code, response_json = future.result()
results[index] = (status_code, response_json)
except Exception as e:
print(f"Request failed: {e}")
# 現在的 results 是按照原來的順序排列的
for result in results:
print(f"Status code: {result[0]}, Response: {result[1]}")
```
2. **使用 `concurrent.futures.wait()`**: `wait()` 允許你等待所有 Future 完成,然後按原順序處理結果。
不過,如果你需要保持原順序,第一個方法更常用,也更簡單。
這樣可以確保即使請求的返回順序不同,你仍能按照發送請求的順序來處理結果。
<br>
### 使用 `concurrent.futures.wait()` 等待批次結果
- ### Question
```
concurrent.futures.wait()
```
若使用這個用法,上面範例要如何改寫?
- ### ChatGPT-4o-2024-08-06
如果你想使用 `concurrent.futures.wait()` 來處理所有 `Future`,並在所有請求完成後再處理結果,你可以這樣改寫上面的範例:
```python=
import requests
import concurrent.futures
api_url = "http://example"
data_list = [...]
headers = {
# 在這裡設定你的 headers
}
def send_request(data):
response = requests.post(api_url, headers=headers, json=data)
return response.status_code, response.json() # 可以根據需要返回更多信息
# 使用 ThreadPoolExecutor 來並行處理請求
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 使用列表保存 futures
futures = [executor.submit(send_request, data_list[i + j]) for i in range(0, 100, 5) for j in range(5)]
# 等待所有 futures 完成
concurrent.futures.wait(futures)
# 這裡保證結果按發送順序處理
for future in futures:
try:
status_code, response_json = future.result()
print(f"Status code: {status_code}, Response: {response_json}")
except Exception as e:
print(f"Request failed: {e}")
```
說明:
1. **`concurrent.futures.wait(futures)`**: 這個方法會阻塞,直到所有的 `Future` 都完成。這樣確保你在所有請求完成後再進行後續的結果處理。
2. **順序處理結果**: 在 `wait` 完成後,直接對 `futures` 列表進行迭代,這樣可以保證結果處理的順序與 `futures` 列表中的順序一致。
這樣改寫後,你可以確保所有請求完成後,按照發送順序依次處理每個請求的結果。
<br>
## [實戰] 使用 ThreadPoolExecutor 並行發 request
```
from datetime import datetime
import concurrent.futures
import os
import requests
question_list = []
for i in range(3, 53):
q = '+'.join([str(num) for num in range(1, i)]) + '=___'
question_list.append(q)
def calculate(question):
api_url = 'https://api-ffm-stg.twcc.ai/api/models/conversation'
headers = {
'Content-Type': 'application/json',
'X-API-KEY': os.environ['API_KEY'],
}
data = {
'model': 'ffm-llama3-70b',
'parameters': {
'max_new_tokens': 100,
'seed':42
},
'messages': [
{'role': 'user', 'content': question},
]
}
res = requests.post(api_url, headers=headers, json=data)
content = res.json()
return content
batch_size=10
start_time = datetime.now()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in range(0, len(question_list), batch_size):
futures = []
for j in range(batch_size):
if i + j >= len(question_list):
break
futures.append(executor.submit(calculate, question_list[i + j]))
concurrent.futures.wait(futures)
for idx, future in enumerate(futures):
print(question_list[i+idx], future.result()['generated_text'])
end_time = datetime.now()
elapsed_time = end_time - start_time
print(elapsed_time)
# - batch_size=10, max_workers=5
# - 0:00:32.568639
# - 0:00:32.397404
# - 0:00:40.476837
# - 0:00:39.900161
# - batch_size=1, max_workers=1
# - 0:02:10.123736
# - 0:02:07.995943
```
執行結果:
```
1+2=___ 3
1+2+3=___ 6
1+2+3+4=___ 1+2+3+4=10
1+2+3+4+5=___ 15
1+2+3+4+5+6=___ 1+2+3+4+5+6=21
1+2+3+4+5+6+7=___ 1+2+3+4+5+6+7=28
1+2+3+4+5+6+7+8=___ 1+2+3+4+5+6+7+8=36
1+2+3+4+5+6+7+8+9=___ 1+2+3+4+5+6+7+8+9=45
1+2+3+4+5+6+7+8+9+10=___ 1+2+3+4+5+6+7+8+9+10=55
1+2+3+4+5+6+7+8+9+10+11=___ 1+2+3+4+5+6+7+8+9+10+11=66
1+2+3+4+5+6+7+8+9+10+11+12=___ 1+2+3+4+5+6+7+8+9+10+11+12=78
1+2+3+4+5+6+7+8+9+10+11+12+13=___ 1+2+3+4+5+6+7+8+9+10+11+12+13=91
1+2+3+4+5+6+7+8+9+10+11+12+13+14=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14=105
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15=120
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16=136
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17=153
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18=171
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19=190
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20=210
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21=231
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22=___ 252
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23=253
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24=300
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25=325
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26=351
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27=378
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28=378
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29=435
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30=465
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31=496
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32=528
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33=561
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34=595
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35=595
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36= 666
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37=666
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38=741
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39=___ 780
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40=810
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41=861
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42=903
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43=903
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44=990
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45=1035
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46=1035
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47=1081
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48=1176
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49的總和
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49+50=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49+50=
1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49+50+51=___ 1+2+3+4+5+6+7+8+9+10+11+12+13+14+15+16+17+18+19+20+21+22+23+24+25+26+27+28+29+30+31+32+33+34+35+36+37+38+39+40+41+42+43+44+45+46+47+48+49+50+
```
## [實戰] 使用 ThreadPoolExecutor + async 並行發 request
```python=
from datetime import datetime
import asyncio
import concurrent
import os
import requests
question_list = []
for i in range(3, 53):
q = '+'.join([str(num) for num in range(1, i)]) + '=___'
question_list.append(q)
def calculate(question):
api_url = 'https://api-ffm-stg.twcc.ai/api/models/conversation'
headers = {
'Content-Type': 'application/json',
'X-API-KEY': os.environ['API_KEY'],
}
data = {
'model': 'ffm-llama3-70b',
'parameters': {
'max_new_tokens': 100,
'seed':42
},
'messages': [
{'role': 'user', 'content': question},
]
}
res = requests.post(api_url, headers=headers, json=data)
content = res.json()
return content
async def calculate_async(executor, question):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, calculate, question)
return result
batch_size=10
start_time = datetime.now()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in range(0, len(question_list), batch_size):
tasks = []
for j in range(batch_size):
if i + j >= len(question_list):
break
tasks.append(calculate_async(executor, question_list[i + j]))
results = await asyncio.gather(*tasks)
for idx, result in enumerate(results):
print(question_list[i+idx], result['generated_text'])
end_time = datetime.now()
elapsed_time = end_time - start_time
print(elapsed_time)
```
<br>
## [實戰] 使用 asyncio + aiohttp 並行發 request
- ### gpt-4o-2024-08-06
你可以使用 Python 的 asyncio 和 aiohttp 套件來實作非同步版本的代碼。這樣可以避免阻塞 I/O 操作,特別是在處理 HTTP 請求時,能夠提高效能。以下是使用 asyncio 和 aiohttp 改寫的代碼:
```python=
import os
import aiohttp
import asyncio
from datetime import datetime
question_list = []
for i in range(3, 53):
q = '+'.join([str(num) for num in range(1, i)]) + '=___'
question_list.append(q)
async def calculate(session, question):
api_url = 'https://api-ffm-stg.twcc.ai/api/models/conversation'
headers = {
'Content-Type': 'application/json',
'X-API-KEY': os.environ['API_KEY'],
}
data = {
'model': 'ffm-llama3-70b',
'parameters': {
'max_new_tokens': 100,
'seed': 42
},
'messages': [
{'role': 'user', 'content': question},
]
}
async with session.post(api_url, headers=headers, json=data) as res:
content = await res.json()
return content
async def main():
batch_size = 8
start_time = datetime.now()
async with aiohttp.ClientSession() as session:
for i in range(0, len(question_list), batch_size):
tasks = [calculate(session, question_list[i + j]) for j in range(batch_size)]
results = await asyncio.gather(*tasks)
for idx, result in enumerate(results):
print(question_list[i + idx], result['generated_text'])
end_time = datetime.now()
elapsed_time = end_time - start_time
print(elapsed_time)
# 執行 asyncio 主程式
asyncio.run(main())
```
### 關鍵變更:
- 使用 `aiohttp` 取代 `requests` 進行非同步 HTTP 請求。
- 將 `calculate` 函數改為 `async def`,並使用 `async with` 來發送非同步請求。
- 使用 `asyncio.gather` 同時處理多個非同步任務。
這樣的方式將會提高批次請求的並行處理能力,特別適合處理大量 HTTP 請求的場景。
<br>
## 討論
### concurrent vs asyncio
> - [Concurrency and async / await](https://fastapi.tiangolo.com/async/)
> - [Get to know Asynchio: Multithreaded Python using async/await](https://daily.dev/blog/get-to-know-asynchio-multithreaded-python-using-asyncawait)
>
- ### Question
python concurrent.futures.ThreadPoolExecutor 和 使用 asyncio 套件
兩邊效能有何差異?還是兩者是相同,只是語法糖的表示?
- ### ChatGPT-4o-20240806
## `concurrent.futures.ThreadPoolExecutor` 和 `asyncio` 套件效能差異
`concurrent.futures.ThreadPoolExecutor` 和 `asyncio` 雖然都能用來實現並行操作,但它們的工作方式和效能在不同場景下有所不同,並非只是語法糖的差異。
### 1. 工作方式
- **`concurrent.futures.ThreadPoolExecutor`**:
- 基於線程的執行模型,它會在不同的線程中執行函數,因此適合用於需要阻塞 I/O 操作(例如檔案讀寫、網路請求)的場景。每個線程都是獨立的,並行地執行任務。
- **`asyncio`**:
- 基於協程的事件驅動模型,適合用於處理 I/O 密集型任務(如非阻塞 I/O)。它不會真正創建多個線程或進程,而是透過單一線程來處理異步任務。適合用於非阻塞的網路 I/O(如 HTTP 請求、資料庫訪問)等場景。
### 2. 效能比較
- **`ThreadPoolExecutor` 效能**:
- 優點:非常適合需要進行阻塞操作(例如,網路 I/O 或磁碟 I/O)的應用程式,因為線程可以在 I/O 等待期間繼續運行其他線程。
- 缺點:線程的開銷較高,尤其是在大量創建或銷毀線程時,這會對 CPU 密集型任務造成額外的開銷。線程之間的上下文切換和同步也會影響效能。
- **`asyncio` 效能**:
- 優點:當處理大量 I/O 密集型任務時,`asyncio` 可以更高效,因為它在單一線程內進行協程切換,避免了線程上下文切換的開銷。它的事件循環允許在 I/O 等待期間繼續執行其他任務。
- 缺點:`asyncio` 不適合 CPU 密集型任務,因為所有協程共用單一線程,無法充分利用多核處理器。如果協程中存在 CPU 密集型任務,則可能會阻塞事件循環。
### 3. 場景選擇
- 使用 **`ThreadPoolExecutor`**:
- 需要阻塞 I/O 的操作(例如,大量的網路請求、檔案讀取/寫入)。
- 需要多核處理器來處理 CPU 密集型任務(例如影像處理、數據處理等)。
- 使用 **`asyncio`**:
- 非阻塞 I/O 場景(例如,大量異步 HTTP 請求、WebSocket 通信)。
- 對效能和資源消耗敏感的場景,特別是需要同時管理大量 I/O 任務時。
### 小結
- `ThreadPoolExecutor` 和 `asyncio` 在 I/O 密集任務上都能提升效能,但 `asyncio` 更高效,因為它避免了線程切換的開銷。然而,當需要處理 CPU 密集型任務時,`ThreadPoolExecutor` 可能會有更好的表現。
<br>
### CPU 密集型任務 是指什麼?有具體說明?
- ### ChatGPT-4o-20240806
## CPU 密集型任務 是什麼?
**CPU 密集型任務**(CPU-intensive tasks)指的是那些主要依賴中央處理器(CPU)進行計算的任務,這些任務通常需要大量的運算資源,而不是等待外部 I/O 資源(如磁碟、網絡)的任務。
### 具體的 CPU 密集型任務包括:
1. **數值計算**:
- 大量的數字運算,例如科學計算、數學建模、矩陣運算、線性代數、求解方程等。
- 例如:求解大規模的矩陣乘法、進行複雜數學運算(如計算圓周率到極高精度)。
2. **數據處理與轉換**:
- 需要處理大量數據且數據處理步驟涉及大量邏輯運算和計算操作的情況,例如數據分析、統計計算、大數據運算等。
- 例如:對大規模資料集進行排序、篩選、分組等操作。
3. **加密與解密**:
- 加密和解密算法,如 AES、RSA 等,這些操作通常涉及大量的數學計算和位運算,因此屬於 CPU 密集型任務。
- 例如:對大型文件進行加密或解密操作。
4. **影像處理**:
- 包括圖片編碼、解碼、壓縮、濾波、特徵提取等任務。影像處理中的算法(如卷積運算)需要大量計算資源。
- 例如:對高分辨率的圖片進行處理,應用圖像濾波、邊緣檢測等算法。
5. **機器學習與深度學習**:
- 訓練機器學習模型和深度學習模型時,涉及大量矩陣運算和浮點數計算,特別是神經網絡訓練時的大量反向傳播算法計算。
- 例如:使用大量數據訓練深度學習模型如卷積神經網絡(CNN)或循環神經網絡(RNN)。
6. **3D 渲染和動畫計算**:
- 渲染三維圖形,尤其是高質量的實時渲染和動畫製作,會涉及大量的幾何計算和光線跟踪算法。
- 例如:生成複雜的三維模型、實時渲染場景中的光影和反射效果。
7. **模擬與模擬器運行**:
- 物理模擬、金融模擬、氣象模擬等,這些任務通常需要模擬系統的行為,並依賴數百萬次計算。
- 例如:流體動力學模擬、天氣預測計算等。
### 與 I/O 密集型任務的區別:
- **I/O 密集型任務**: 主要花費時間等待 I/O 操作的完成,如磁碟讀寫、網絡請求、數據庫操作等。這些任務通常等待外部資源,而非處理器計算。
- **CPU 密集型任務**: 主要花費 CPU 資源進行大量運算,沒有頻繁的 I/O 等待過程。
因此,對於 CPU 密集型任務來說,多核 CPU 或者 GPU 能夠大幅提高處理效能。而對於 I/O 密集型任務,使用異步 I/O 或線程池來同時處理多個 I/O 操作會更有效率。
<br>
### 如果不使用 aiohttp,asynio 是否只是 ThreadPoolExecutor 的語法糖?
- ### gpt-4o-2024-08-06
如果你不使用像 `aiohttp` 這樣的原生異步 HTTP 客戶端,並且仍依賴 `requests.post` 這類同步函數,那麼你可以使用 `ThreadPoolExecutor` 和 `asyncio.run_in_executor()` 來達到並行處理的效果。在這種情況下,`asyncio` 的作用確實是作為一種語法糖,用來簡化與 `ThreadPoolExecutor` 的互動。
要明白這點,先區分以下幾個概念:
1. **同步 vs. 異步**:
- 同步操作會阻塞進程直到操作完成(例如 `requests.post`)。
- 異步操作則允許程式在等待 I/O 時繼續處理其他任務,只有真正需要結果時才會等待。
2. **`ThreadPoolExecutor`**:
它是 `concurrent.futures` 模組的一部分,用於在多個線程中運行同步操作,允許你模擬並行處理。`ThreadPoolExecutor` 適合那些計算密集或 I/O 密集的同步任務,例如文件讀寫或 HTTP 請求。
3. **`asyncio` 的語法糖**:
當你使用 `asyncio.run_in_executor()` 時,實際上是將同步操作提交到 `ThreadPoolExecutor` 中去執行。這意味著你的代碼仍然是同步執行的,但 `asyncio` 幫助你管理了這些同步任務的調度和等待邏輯,讓它看起來像是非同步的。
### 是否只是語法糖?
在這種情況下,**是的,`asyncio` 對 `ThreadPoolExecutor` 的使用就像語法糖一樣**,因為它簡化了多線程的運行管理。但它仍然有以下幾個優點:
1. **統一管理異步和同步操作**:你可以將異步操作(例如 I/O 密集型操作)和同步操作混合使用在同一個事件循環中,這樣程式碼的結構會更加一致,並且可以更靈活地控制執行的任務。
2. **語法簡化**:使用 `async/await` 可以讓程式碼看起來更直觀,特別是在多任務管理和結果收集時,而不需要手動管理 `Future` 物件或回調函數。
3. **更易擴展**:如果未來你切換到真正的非同步 HTTP 客戶端(例如 `aiohttp`),你不需要大幅度更改程式結構,因為程式碼已經使用了 `asyncio` 的結構。
### 具體區別:
1. **`asyncio.run_in_executor()` 使用線程**:它本質上是將同步任務交給線程池來處理,所以實際上仍然會有線程在執行同步任務,並不是完全的非同步。它不能利用 `asyncio` 的事件循環來進行更細粒度的非阻塞操作。
2. **真正的非同步操作(例如 `aiohttp`)**:這種方式則會直接利用 `asyncio` 的事件循環,進行完全的非阻塞 I/O 操作。與線程池相比,這種方式會有更好的效能和更少的資源消耗,特別是在處理大量 I/O 請求時。
### 結論:
當你使用 `requests` 並且配合 `asyncio.run_in_executor()` 時,`asyncio` 確實只是語法糖,背後依然是 `ThreadPoolExecutor` 來並行執行同步任務。真正的異步執行需要原生支持異步 I/O 的庫來實現,例如 `aiohttp`。
<br>