# 北榮微服務教學,以FHIR為例
2023.2.1,朱原嘉 Sky Chu
# 1.1 使用Python把欄位對到FHIR
```
import os
import psycopg2
from flask import Flask, jsonify
app = Flask(__name__)
# Connect to the PostgreSQL database
conn = psycopg2.connect(
host="localhost",
database="your_database_name",
user="your_user_name",
password="your_password"
)
#Connect to the PostgreSQL using environment variables
--conn = psycopg2.connect(
-- host=os.environ.get("DB_HOST"),
-- database=os.environ.get("DB_NAME"),
-- user=os.environ.get("DB_USER"),
-- password=os.environ.get("DB_PASSWORD")
--)
# Fetch data from the database and convert it to FHIR
def get_data_fhir():
cursor = conn.cursor()
cursor.execute("SELECT * FROM your_table_name")
data = cursor.fetchall()
fhir_data = []
for record in data:
# Convert each database record to a FHIR resource
fhir_record = {
"resourceType": "your_resource_type",
"id": record[0],
"field1": record[1],
"field2": record[2],
# Add additional fields as needed
}
fhir_data.append(fhir_record)
return fhir_data
# Define the RESTful endpoint to retrieve the FHIR data
@app.route("/fhir/data", methods=["GET"])
def get_data():
fhir_data = get_data_fhir()
return jsonify(fhir_data)
if __name__ == "__main__":
app.run()
```
註1.This code uses the psycopg2 library to connect to the PostgreSQL database and retrieve data from it. The data is then converted to FHIR using a custom function get_data_fhir(), which creates a list of dictionaries, where each dictionary represents a FHIR resource. The RESTful endpoint is defined using Flask's @app.route decorator, which allows you to specify the URL and HTTP method for accessing the data. The endpoint returns the FHIR data as a JSON object.
註2.You should also set the environment variables DB_HOST, DB_NAME, DB_USER, and DB_PASSWORD with the appropriate values for your database.
註3.記得, 檔名要存成app.py(對應到Dockerfile)
# 1.2 在Postgresql中,使用Store Procedure把欄位對到FHIR
```
CREATE OR REPLACE FUNCTION get_data_fhir()
RETURNS JSON AS $$
DECLARE
fhir_data JSON = '[]';
record RECORD;
BEGIN
FOR record IN (SELECT * FROM your_table_name)
LOOP
fhir_data = fhir_data || json_build_object(
'resourceType', 'your_resource_type',
'id', record.column1,
'field1', record.column2,
'field2', record.column3
-- Add additional fields as needed
);
END LOOP;
RETURN fhir_data;
END;
$$ LANGUAGE plpgsql;
```
資料查詢:
`SELECT * FROM get_data_fhir();`
# 1.3 在Postgresql中,使用Store Procedure把身份證號用MD5做UUID
```
-- Function to generate the MD5 hash of a string in PostgreSQL
CREATE OR REPLACE FUNCTION hash_string(input_string text)
RETURNS text
AS $$
BEGIN
RETURN encode(digest(input_string, 'md5'), 'hex');
END;
$$ LANGUAGE plpgsql;
-- Store procedure to hash a specific string "A123456789" using the function defined above
CREATE OR REPLACE FUNCTION hash_specific_string()
RETURNS text
AS $$
DECLARE
input_string text := 'A123456789';
hashed_string text;
BEGIN
hashed_string := hash_string(input_string);
RETURN hashed_string;
END;
$$ LANGUAGE plpgsql;
```
# 1.4 在Postgresql中,使用python把身份證號用MD5做UUID
```
import hashlib
# Function to generate the MD5 hash of a string in Python
def hash_string(input_string: str) -> str:
return hashlib.md5(input_string.encode()).hexdigest()
# Use the function to hash the specific string "A123456789"
input_string = "A123456789"
hashed_string = hash_string(input_string)
print(hashed_string)
```
# 1.5 使用DB2 store procedure把身份證號用MD5做UUID
> -- Declare a variable to store the MD5-encoded value
> DECLARE @md5Value VARCHAR(32);
>
> -- Encode the value "A123456789" with the MD5 algorithm
> SET @md5Value = HASH_MD5('A123456789');
>
> -- Insert the encoded value into an IBM DB2 table
> INSERT INTO db2_table (md5_column)
> VALUES (@md5Value);
>
# 1.6 使用DB2把欄位對到FHIR
```
CREATE OR REPLACE PROCEDURE get_data_fhir(OUT fhir_data CLOB(1M))
LANGUAGE SQL
BEGIN
DECLARE v_fhir_data VARCHAR(1024);
DECLARE v_resourceType VARCHAR(50);
DECLARE v_id INTEGER;
DECLARE v_field1 VARCHAR(50);
DECLARE v_field2 VARCHAR(50);
-- initialize fhir_data with "[]"
SET fhir_data = '[';
-- cursor to retrieve data from table
DECLARE cursor1 CURSOR FOR
SELECT column1, column2, column3
FROM your_table_name;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
-- loop through cursor and build fhir_data string
OPEN cursor1;
FETCH cursor1 INTO v_id, v_field1, v_field2;
WHILE (NOT done) DO
SET v_fhir_data = '{"resourceType": "your_resource_type",';
SET v_fhir_data = v_fhir_data || '"id": ' || v_id || ',';
SET v_fhir_data = v_fhir_data || '"field1": "' || v_field1 || '",';
SET v_fhir_data = v_fhir_data || '"field2": "' || v_field2 || '"},';
SET fhir_data = fhir_data || v_fhir_data;
FETCH cursor1 INTO v_id, v_field1, v_field2;
END WHILE;
CLOSE cursor1;
-- remove last comma from fhir_data string and add closing bracket
SET fhir_data = LEFT(fhir_data, LENGTH(fhir_data) - 1);
SET fhir_data = fhir_data || ']';
END;
```
# 2、要將此程式封裝為 Dockerfile
Here's an example of how you could package the Python program into a Docker container:
1.Create a Dockerfile:
```
# Use a Python base image
FROM python:3.8
# Set the working directory
WORKDIR /app
# Copy the required files to the working directory
COPY requirements.txt .
COPY app.py .
# Install the required packages
RUN pip install -r requirements.txt
# Specify the command to run when the container starts
CMD ["python", "app.py"]
```
2.Create a requirements.txt file to specify the required Python packages:
```
flask
psycopg2-binary
```
#註1:psycopg2建議直接使用binary的比較省事,不用拉source
#註2:但因為我們使用ibm power cpu, 還是要重新在ibm power cpu build一下
3.Build the Docker image using the following command:
`docker build -t myimage .`
4.Run the Docker container using the following command:
`docker run -p 5000:5000 myimage`
註3.如果使用環境變數,則使用下列指令執行
`docker run --env DB_HOST= --env DB_NAME= --env DB_USER= --env DB_PASSWORD= -p 5000:5000 myimage `
This will run the Python program in a Docker container, mapping port 5000 on the host to port 5000 in the container. You should now be able to access the RESTful endpoint by making a GET request to http://localhost:5000/fhir/data
# 3、把這個Docker放到k8s
Here's an example of how you could deploy the Docker container to a Kubernetes cluster:
1.Create a deployment.yml file:
```
apiVersion: apps/v1
kind: Deployment
metadata:
name: mydeployment
spec:
replicas: 1
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: mycontainer
image: myimage
ports:
- containerPort: 5000
env:
- name: DB_HOST
value:
- name: DB_NAME
value:
- name: DB_USER
value:
- name: DB_PASSWORD
value:
```
註1.如果會一次需要建立多個,label建議名稱要不同
註2.matchLabels還有selector內label名稱也要改成對應
2.Create a service.yml file:
```
apiVersion: v1
kind: Service
metadata:
name: myservice
spec:
selector:
app: myapp
ports:
- name: http
port: 5000
targetPort: 5000
type: ClusterIP
```
3.Apply the deployment and service to the cluster using the following commands:
```
kubectl apply -f deployment.yml
kubectl apply -f service.yml
```
# 4.AICS API for ICD -curl範例
> my_token=$(curl -H "Content-Type: application/json" --data '{"strategy":"local","email":"tv_med@headless.aics.asus.com","password":"wNDkLOTXfhtAFBdNRvGK","apiId":"med-service-api"}' 'https://aics-api.asus.com/apim-stage/authentication')
>
> acc_token=$(jq -r '.session' <<< ${my_token})
>
> patient_data=$(curl -X 'POST' 'https://aics-api.asus.com/api/icd10-stage/diagnosis?model_type=in_patient' -H 'accept: application/json' -H "ocp-apim-session-token: ${acc_token}" -H 'Content-Type: application/json' -d '{"model":"TV","notes":["left chronic otitis media status post"],"patient":{}}')
>
> echo ${patient_data} | jq -r '.[0]' | jq -r '.[0].icd'
> echo ${patient_data} | jq -r '.[0]' | jq -r '.[0].prob'****
註1.API URL:https://app.swaggerhub.com/apis-docs/henryyang42/Miraico/1.0.0#/
註2.先取得token, token只能用48小時, 重取
註3.丟入一段英文, 會出現主診斷ICD和機率
註4.在給HIS, 主診斷和CC, 做成web page
# 5.AICS API for ICD -ajax範例
```
// send a post request to the authentication endpoint
$.ajax({
type: "POST",
url: "https://aics-api.asus.com/apim-stage/authentication",
headers: {
"Content-Type": "application/json"
},
data: JSON.stringify({
strategy: "local",
email: "tv_med@headless.aics.asus.com",
password: "wNDkLOTXfhtAFBdNRvGK",
apiId: "med-service-api"
}),
success: function(response) {
// store the token in a variable
var my_token = response;
// extract the session from the response
var acc_token = response.session;
// send a post request to get the patient data
$.ajax({
type: "POST",
url: "https://aics-api.asus.com/api/icd10-stage/diagnosis?model_type=in_patient",
headers: {
"accept": "application/json",
"ocp-apim-session-token": acc_token,
"Content-Type": "application/json"
},
data: JSON.stringify({
model: "TV",
notes: ["left chronic otitis media status post"],//把醫師寫的那段英文, 貼在這裡
patient: {}
}),
success: function(patient_data) {
// store the patient data
var patient_data = patient_data;
}
});
}
});
```
# 6.AICS API for ICD-python範例
```
import requests
import json
# Set the request headers
headers = {
"Content-Type": "application/json",
"accept": "application/json",
"ocp-apim-session-token": ""
}
# Authenticate and get the token
data = {
"strategy": "local",
"email": "tv_med@headless.aics.asus.com",
"password": "wNDkLOTXfhtAFBdNRvGK",
"apiId": "med-service-api"
}
response = requests.post('https://aics-api.asus.com/apim-stage/authentication', headers=headers, data=json.dumps(data))
my_token = response.json()
# Get the access token
acc_token = my_token['session']
headers["ocp-apim-session-token"] = acc_token
# Get the patient data
data = {
"model": "TV",
"notes": [
"left chronic otitis media status post"
],
"patient": {}
}
response = requests.post('https://aics-api.asus.com/api/icd10-stage/diagnosis?model_type=in_patient', headers=headers, data=json.dumps(data))
patient_data = response.json()
```
# 6.1 使用 FastAPI 創建 REST API 以提供 biobert 模型的建議服務的 Python 程式碼。程式中也包括 Swagger 和 OAuth2 認證的實作。
1. 建立 FastAPI 應用程式物件,並設定 CORS 中介軟體。
2. 載入 biobert 模型和 tokenizer,以便在請求中使用。
3. 定義輸入和輸出的 Pydantic 模型,以便 FastAPI 可以自動驗證輸入並產生 Swagger API 文件。
4. 定義 OAuth2 相關的參數和函式,以便實作 OAuth2 認證。
5. 定義幾個 API 端點,包括生成存取權杖、提供建議的 morphology codes 等等。
6. 在 `suggest_morphology_codes` 函式中使用 biobert 模型進行推論,並將結果排序後回傳。在這個函式中,使用了 OAuth2 認證來驗證存取權杖。
```
from datetime import datetime, timedelta
from typing import Union,Any
import uvicorn
from fastapi import FastAPI, Header, HTTPException, status, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline
import numpy as np
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "67479e3246f6bd6849888cd5621bab5449d3423a3ea024c4b81102eb8637fd45"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"user1": {
"username": "user1",
"full_name": "user1",
"email": "user1@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 定義 OAuth2 相關的參數
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
# 載入 biobert 模型和 tokenizer
model_name = "../icd10/checkpoints/CM_C22_time90_BioBERT/02-10+17.00"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=60)
pipe = TextClassificationPipeline(model=model, tokenizer=tokenizer)
#model.eval()
# 定義輸入請求的 Pydantic 模型
class InputText(BaseModel):
text: str
# 定義輸出回應的 Pydantic 模型
class OutputMorphologyCodes(BaseModel):
codes: Any
@app.post("/suggest-morphology-codes", response_model=OutputMorphologyCodes)
async def suggest_morphology_codes(
input_text: InputText,
):
# 將輸入的文字編碼成模型可接受的格式
input_ids = tokenizer.encode(input_text.text, return_tensors="pt")
# 執行模型推論
outputs = model(input_ids)
prediction = pipe(input_text.text, return_all_scores=True)
# 從模型輸出中取出推薦的標籤和對應的機率
logits = outputs.logits.detach().numpy()[0]
labels = model.config.id2label
codelist = np.loadtxt(f'../icd10/label/CM_c22_time90.csv', str, delimiter='\n') #載入診斷碼清單
#label_probs = [(labels[i], float(logits[i])) for i in range(len(labels))]
label_probs = [(i, float(prediction[0][i]['score'])) for i in range(len(labels))]
# 依機率由高到低排序
sorted_label_probs = sorted(label_probs, key=lambda x: x[1], reverse=True)
# 取出前10個機率最高的標籤
top_labels = [(codelist[int(label)], prob) for label, prob in sorted_label_probs[:10]]
# 回傳輸出回應
return OutputMorphologyCodes(codes=top_labels)
if __name__ == "__main__":
#uvicorn.run(app='main:app', host="127.0.0.1", port=8022, reload=True, workers=2)
uvicorn.run(app, host="0.0.0.0", port=8001)
```
# 6.2 AICS API for ICD-ajax範例,解決CROS問題
CORS (Cross-Origin Resource Sharing) policy 是由瀏覽器實行的安全機制,限制了不同源之間的網路資源存取,為了解決這個問題,可以加上以下的 jQuery code,讓 server 端開啟 CORS policy。
```
$.ajaxPrefilter(function(options, originalOptions, jqXHR) {
options.crossDomain ={
crossDomain: true
};
options.xhrFields = {
withCredentials: true
};
});
```
修改後的程式如下:
```
// send a post request to the authentication endpoint
$.ajaxPrefilter(function(options, originalOptions, jqXHR) {
options.crossDomain ={
crossDomain: true
};
options.xhrFields = {
withCredentials: true
};
});
$.ajax({
type: "POST",
url: "https://aics-api.asus.com/apim-stage/authentication",
headers: {
"Content-Type": "application/json"
},
data: JSON.stringify({
strategy: "local",
email: "tv_med@headless.aics.asus.com",
password: "wNDkLOTXfhtAFBdNRvGK",
apiId: "med-service-api"
}),
success: function(response) {
// store the token in a variable
var my_token = response;
// extract the session from the response
var acc_token = response.session;
// send a post request to get the patient data
$.ajax({
type: "POST",
url: "https://aics-api.asus.com/api/icd10-stage/diagnosis?model_type=in_patient",
headers: {
"accept": "application/json",
"ocp-apim-session-token": acc_token,
"Content-Type": "application/json"
},
data: JSON.stringify({
model: "TV",
notes: ["left chronic otitis media status post"],//把醫師寫的那段英文, 貼在這裡
patient: {}
}),
success: function(patient_data) {
// store the patient data
var patient_data = patient_data;
}
});
}
});
```
# 7.1 計算運動期間的平均動脈壓(Mean Arterial Pressure, MAP),MAP = DBP + 0.01exp (4.14–40.74/HR) (SBP-DBP) 。
```
# 計算MAP
def calc_map(sbp, dbp, hr):
st = 0.01 * math.exp(4.14 - 40.74 / hr)
map = dbp + st * (sbp - dbp)
return map
# 輸入安靜期間的收縮壓(Systolic Blood Pressure, SBP)和舒張壓(Diastolic Blood Pressure, DBP)
sbp = int(input("請輸入安靜期間的收縮壓: "))
dbp = int(input("請輸入安靜期間的舒張壓: "))
# 輸入運動期間的心率
exercise_hr = int(input("請輸入運動期間的心率: "))
# 計算MAP
map = calc_map(sbp, dbp, exercise_hr)
# 輸出MAP
print("運動期間的平均動脈壓 (MAP) 為: ", map, "mmHg")
```
# 7.2 計算收縮壓變異性指數(ARV),ARV=1∑Wk∑nK=2Wk×∣BPk−BPk−1∣
```
def calc_arv(bp_data):
# 初始化變數
n = len(bp_data)
w = [1 / n] * n
arv = 0
# 計算ARV
for k in range(1, n):
arv += w[k] * abs(bp_data[k] - bp_data[k-1])
arv /= sum(w)
return arv
# 測試資料
bp_data = [120, 122, 118, 126, 124, 128, 120, 122, 116, 124]
# 計算ARV
arv = calc_arv(bp_data)
# 輸出ARV
print("收縮壓變異性指數 (ARV) 為: ", arv)
```