# locust
## Brief
API 壓力負載測試
## References
+ 📄 [**Docs - Locust**](https://locust.io/)
+ 📄 [**GitHub - Locust**](https://github.com/locustio/locust)
+ 🔗 [**fourcolor - 壓力測試 Locust 教學**](https://hackmd.io/@fourcolor/BkltPSg0t)
## Note
+ Number of users (peak concurrency):同時發出請求的 user 上限數量
+ Ramp up (user started/second):每秒新增多少個 user
+ 檔名一定要叫 `locustfile.py`
+ command
```
locust -H http://localhost:8000 -f ".\backend\performance\locustfile.py"
```
+ `@task` decorator 裡的數字代表執行權重(多少可能性執行此任務)
+ `response` 可以標記 success 或 failure,這會在監控 GUI 顯示出來。
## Testing
### SUT
+ CPU:Intel Core i5-13400F
+ System:FastAPI + PostgreSQL + Docker(含 login JWT 驗證功能)
+ Workers:1
+ Engine 設定
```py
engine = create_async_engine(
settings.get_settings().database_uri,
pool_size=50,
max_overflow=50,
pool_timeout=40,
pool_recycle=1800,
)
```
+ 以下使用 `[peak-concurrency]-[user-started/sec]` 做為測試名稱
+ 註:PostgreSQL 預設 `max_connections=100` (瓶頸應該是 PostgreSQL)
### Scripts
```py
import json
import random
from collections.abc import Callable
from datetime import datetime
from functools import wraps
from locust import HttpUser, between, task
def ensure_login(func: Callable[["WebsiteUser"], bool]):
@wraps(func)
def wrapper(self: "WebsiteUser", *args, **kwargs):
# 確保 login
if not hasattr(self, "headers"):
while not self.login():
self.login()
# 執行 task
expired = func(self, *args, **kwargs)
# 替換過期 access token
if expired:
while not self.refresh():
self.refresh()
return wrapper
class WebsiteUser(HttpUser):
wait_time = between(1, 3)
serial_num = 1
def on_start(self):
while not self.create_user():
self.create_user()
self.login()
def create_user(self) -> bool:
self.name = f"CrazyFriday{WebsiteUser.serial_num}"
self.email = f"user{WebsiteUser.serial_num}@example.com"
self.password = "securePass123"
self.age = random.randint(18, 60)
self.birthday = datetime(1990, 1, 1).strftime("%Y-%m-%d")
self.item_ids: list[int] = []
WebsiteUser.serial_num += 1
payload = {
"name": self.name,
"email": self.email,
"password": self.password,
"age": self.age,
"birthday": self.birthday,
}
with self.client.post(
"/api/users",
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
catch_response=True,
) as response:
if response.status_code == 201:
response.success()
else:
response.failure("create_user failed")
return response.status_code == 201
def login(self) -> bool:
payload = {
"username": self.email,
"password": self.password,
"grant_type": "password",
}
with self.client.post("/api/auth/login", data=payload, catch_response=True) as response:
if response.status_code == 200:
data = response.json()
self.access_token = data["access_token"]
self.refresh_token = data["refresh_token"]
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
else:
response.failure("Login failed")
return response.status_code == 200
def refresh(self) -> bool:
self.headers = {
"Authorization": f"Bearer {self.refresh_token}",
"Content-Type": "application/json",
}
with self.client.post(
"/api/auth/refresh", headers=self.headers, catch_response=True
) as response:
if response.status_code == 200:
response.success()
data = response.json()
self.access_token = data["access_token"]
self.refresh_token = data["refresh_token"]
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
else:
response.failure("Refresh failed")
return response.status_code == 200
@task(3)
@ensure_login
def get_all_items(self) -> bool:
with self.client.get(
"/api/items?page=1&page_size=10", headers=self.headers, catch_response=True
) as response:
if response.status_code == 200 or response.status_code == 401:
response.success()
else:
response.failure("get_all_items failed")
return response.status_code == 401
@task(5)
@ensure_login
def create_item(self) -> bool:
item = {
"name": "pressure-cooker",
"price": round(random.uniform(10, 1000), 2),
"brand": "Prestige",
"description": "This is an auto-generated item!",
"stock": random.randint(1, 100),
}
with self.client.post(
"/api/items", headers=self.headers, data=json.dumps(item), catch_response=True
) as response:
if response.status_code == 201:
response.success()
item_id: int = response.json()["id"]
self.item_ids.append(item_id)
elif response.status_code == 401:
response.success()
else:
response.failure("create_item failed")
return response.status_code == 401
@task(2)
@ensure_login
def get_item_by_id(self) -> bool:
if self.item_ids:
item_id = random.choice(self.item_ids)
with self.client.get(
f"/api/items/{item_id}", headers=self.headers, catch_response=True
) as response:
if response.status_code == 200 or response.status_code == 401:
response.success()
else:
response.failure("get_item_by_id failed")
return response.status_code == 401
@task(2)
@ensure_login
def update_item(self) -> bool:
if self.item_ids:
item_id = random.choice(self.item_ids)
update_data = {
"description": "Update description!",
"stock": random.randint(1, 50),
}
with self.client.patch(
f"/api/items/{item_id}",
headers=self.headers,
data=json.dumps(update_data),
catch_response=True,
) as response:
if response.status_code == 200 or response.status_code == 401:
response.success()
else:
response.failure("get_item_by_id failed")
return response.status_code == 401
@task(1)
@ensure_login
def delete_item(self) -> bool:
if self.item_ids:
item_id = random.choice(self.item_ids)
with self.client.delete(
f"/api/items/{item_id}", headers=self.headers, catch_response=True
) as response:
if response.status_code == 204:
response.success()
self.item_ids.remove(item_id)
elif response.status_code == 401:
response.success()
else:
response.failure("get_item_by_id failed")
return response.status_code == 401
@task(1)
@ensure_login
def get_all_users(self) -> bool:
with self.client.get("/api/users", headers=self.headers, catch_response=True) as response:
if response.status_code == 200:
response.success()
return False
```
### 100-10
+ result
+ RPS:50
+ response time: 20/70 ms
+ CPU 占用率大概 20%


```
postgres=# SELECT COUNT(*) FROM "User";
count
-------
100
(1 row)
postgres=# SELECT COUNT(*) FROM "Item";
count
-------
1059
(1 row)
```
### 200-10
+ result
+ RPS:100
+ response time: 40/80 ms
+ CPU 占用率大概 40%
+ ⚠️ Database 已經開始在警告 `FATAL: sorry, too many clients already`


```
postgres=# SELECT COUNT(*) FROM "User";
count
-------
200
(1 row)
postgres=# SELECT COUNT(*) FROM "Item";
count
-------
2739
(1 row)
```
### 300-10
+ result
+ RPS:150
+ response time: 50/150 ms
+ CPU 占用率大概 60%
+ ⚠️ 已經開始產生一些 failures 了


```
postgres=# SELECT COUNT(*) FROM "User";
count
-------
300
(1 row)
postgres=# SELECT COUNT(*) FROM "Item";
count
-------
2407
(1 row)
```
### 500-10
+ 比照原設定會被幹翻 (在前面緊張時期會衝出 1000 多個 Failure)
+ 加大力度 🔥
+ workers: 5
+ pgbouncer 登場
```yaml
PGBOUNCER_MAX_CLIENT_CONN: 500 # 最多 500 個 client 能與 pgbouncer 連線
PGBOUNCER_DEFAULT_POOL_SIZE: 90 # 每個資料庫能被 pgbouncer 分配到 90 條連線
```
+ result
+ response time: 50/100 ms
+ CPU 占用率大概 120%


```
postgres=# SELECT COUNT(*) FROM "User";
count
-------
500
(1 row)
postgres=# SELECT COUNT(*) FROM "Item";
count
-------
11915
(1 row)
```
### Conclusion
+ big traffic spike
+ 使用者擠在同一時間註冊和登入,延遲炸鍋了(不過後面的 CRUD 倒是非常穩定)
+ small traffic spike
+ 應該是因為使用者在同一時間登入,導致在後面同一時間要 refresh 過期的 access token