###### tags: `FHIR` # HAPI-FHIR Database 轉置資料 (二) - 實作 如果不太了解HAPI-FHIR的資料庫結構的朋友可以看看上一篇[HAPI-FHIR Database 轉置資料 (一) - Database結構](https://hackmd.io/@a5566qq123/SkgwqvZ6F)。 此篇將用Typescript實作從HAPI FHIR資料庫轉置資料到另一個FHIR Server。 # 使用語言 - Typescript # 使用到的套件 - axios (用於API操作) - pg - sequelize # 實作 以下為整體資料夾結構與內容 ![](https://i.imgur.com/iZZnu9S.png) - `config`: 放設定檔 - `models`: 放資料模型SQL、Interface - `utils`: 公用,放資料操作CRUD等 ## 設定檔 ![](https://i.imgur.com/rwhzH7z.png) - 路徑: `config/config.ts` - 設定`SQL`、`FHIR`、`轉置資料`等設定 ```typescript= export const config = { db : { service : "postgres" , //only support postgres now username : "username", //postgresql 帳號 password : "password", //postgresql 密碼 database : "hapi" , //要連接的database名稱 hostName : "localhost" , //postgresql 連接之hostname }, sync: { limit: 100, //pagination limit totalWorker: 5, //併行處理數量 FHIRBaseURL: "http://localhost:8088/fhir/" , //the FHIR base URL haveAuthToken : true, //if FHIR server need to auth to use, please change to true token : "token", // the authorization token method: "put" //using `create` or `update` to sync data } } ``` ## 定義SQL資料表 資料夾結構與內容 ![](https://i.imgur.com/Fmi2Nrn.png) ### models/sql/hfj_res_sync.ts - 此檔案為`hfj_res_sync`資料表的`Sequelize`程式碼定義。 - `hfj_res_sync`用於記錄已轉置資料之來源ID、Resource Type、Resource Version以及目標之ID。 ```typescript= import { Sequelize, DataTypes, Model } from 'sequelize'; /** * * @param {Sequelize} sequelize * @returns {Model} */ module.exports = (sequelize: Sequelize) => { const hfj_res_sync = sequelize.define('hfj_res_sync', { //記錄在HAPI-FHIR的唯一ID,即Resource ID res_id: { type: DataTypes.BIGINT }, //Resource Type res_type: { type: DataTypes.STRING }, //Resource Version res_ver: { type: DataTypes.BIGINT }, //目標FHIR Server創建Resource回傳的ID sync_id : { type: DataTypes.STRING } }, { timestamps : false, freezeTableName: true }); return hfj_res_sync; } ``` ### models/sql/index.ts - 匯出Sequelize主體物件的檔案,以操作資料表CRUD功能。 - 包含連接資料庫、創建資料表等。 ```typescript= import { Sequelize, Op, Dialect } from 'sequelize'; import { config } from '../../config/config'; const sequelize = new Sequelize(config.db.database , config.db.username , config.db.password , { host: config.db.hostName, dialect: config.db.service as Dialect, //mssql // logging : false }); require('./hfj_res_sync')(sequelize); //exec this function when you init /** * @type {Sequelize} */ module.exports = (async function () { try { await sequelize.authenticate(); await sequelize.models['hfj_res_sync'].sync(); console.log('Connection has been established successfully.'); return sequelize; } catch (error) { console.error('Unable to connect to the database:', error); process.exit(1); } })(); ``` ### models/resource.ts 從HAPI-FHIR `hfj_resource`取回資料之interface,這邊我們只會用到`res_id`以及`res_ver`。 ```typescript= export interface IResource { res_id : number; res_ver: number; } ``` ### models/resouce_ver.ts 從HAPI-FHIR `hfj_res_ver`取回資料之interface,這邊我們只會用到`res_id`、`res_ver`、`lo_get`。 ```typescript= export interface IResourceVer { res_id: number; res_ver: number; lo_get: Buffer; } ``` ## 資料操作 資料夾結構與內容 ![](https://i.imgur.com/C6awhY0.png) ### utils/lob.ts - 把HAPI-FHIR`hfj_res_ver`資料表中的`res_text`欄位LOB轉成JSON。 - 由於轉出來的LOB有經過gzip壓縮過,所以有一段Gunzip的程式碼。 ```typescript= import stream from 'stream'; import zlib from 'zlib'; //Gunzip file : https://stackoverflow.com/questions/12148948/how-do-i-ungzip-decompress-a-nodejs-requests-module-gzip-response-body export const convertLOBToJson = (iLOB: any) => { return new Promise((resolve, reject)=> { let buffer: any[] = []; let bufferStream = new stream.PassThrough(); // Write your buffer bufferStream.end(Buffer.from(iLOB.lo_get)); // Pipe it to something else (i.e. stdout) // 解壓縮LOB回傳之Binary let gunzip = zlib.createGunzip(); bufferStream.pipe(gunzip); gunzip.on('data', function (data) { // decompression chunk ready, add it to the buffer buffer.push(data.toString()); }).on("end", function () { // response and decompression complete, join the buffer and return //callback(null, buffer.join("")); let bufferStr = buffer.join(""); let jsonItem = JSON.parse(bufferStr); resolve(jsonItem); }).on("error", function (e) { //callback(e); reject(e); }); }) } ``` ### utils/res.ts - 取得在hfj_resource的資料 - 取得hfj_res_ver單個Resource資料 ```typescript= /** * 用於取得hfj_resource資料的id, resource type, 最新res_ver * @param limit SQL query data limit count * @param offset SQL query data offset */ export const getResources = async (limit: number, offset: number): Promise<Array<IResource>> => { const sequelize = await require('../models/sql/'); let item = await sequelize.query(`SELECT res_type, res_id, res_ver FROM hfj_resource LIMIT ${limit} OFFSET ${offset};`, { type: QueryTypes.SELECT }); return item; } /** * 取得單個Resource資料 * @param resId Resource id * @param resVer Resource Version */ export const getResource = async (resId: number, resVer: number) => { const sequelize = await require('../models/sql/'); let item = await sequelize.query(`SELECT lo_get(res_text), res_id, res_ver FROM hfj_res_ver WHERE res_id=${resId} and res_ver=${resVer};`, { type: QueryTypes.SELECT }); return item.pop(); } export const getResourcesCountByResourceType= async (): Promise<number> => { const sequelize = await require('../models/sql/'); let count = await sequelize.query(`SELECT COUNT(*) FROM hfj_resource`, { type: QueryTypes.SELECT }); return Number(count.pop().count); } ``` ## utils/res_sync.ts - 取得hfj_res_sync某Resource id已轉置的資料 - 新增hfj_res_sync已轉置資料 ```typescript= import * as axios from "axios"; import { QueryTypes, Sequelize } from "sequelize"; import { config } from "../config/config"; import { IResource } from "../models/resource"; import { log } from '../utils/log'; /** * 取得hfj_res_sync某Resource id已轉置的資料,如果沒資料回傳空陣列[] * @param resId Resource id */ export const getSyncedResourceById = async (resId: number)=> { const sequelize = await require('../models/sql/'); let item = await sequelize.query(`SELECT res_id, res_ver FROM hfj_res_sync WHERE res_id=${resId};`, { type: QueryTypes.SELECT }); return item; } /** * 新增hfj_res_sync已轉置資料 * @param resourceType resource type e.g. Patient * @param syncId The target FHIR Server response id * @param resource hfj_resource content */ export const addSyncResource = async (resourceType: string, syncId: string, resource: IResource) => { let syncResource: any = resource; try { syncResource["res_type"] = resourceType; syncResource["sync_id"] = syncId; const sequelize: Sequelize = await require('../models/sql/'); let syncedResource = await sequelize.models["hfj_res_sync"].findOne({ where: { res_id : resource.res_id } }); if (syncedResource) { await sequelize.models["hfj_res_sync"].update(syncResource , { where: { res_id : resource.res_id } }); } else { await sequelize.models["hfj_res_sync"].create(syncResource); } return { status: true, data: syncResource }; } catch(e) { log.error(e); return { status: false, data: e } } } export const doSync: any = { /** Using PUT `update` Web API to create Resource on target FHIR Server * @param resourceType resource type e.g. Patient * @param resId Resource id * @param resJson Resource JSON content */ "put": async (resourceType:string, resId: number, resJson: any)=> { try { let requestConfig: any = {}; if (config.sync.haveAuthToken) { let headers: axios.AxiosRequestHeaders = { Authorization: `Bearer ${config.sync.token}` } requestConfig["headers"] = headers; } let res = await axios.default.put(`${config.sync.FHIRBaseURL}${resourceType}/${resId}`, resJson, requestConfig); return res.data; } catch(e) { log.error(e); return false; } }, /** Using POST `create` Web API to create Resource on target FHIR Server * @param resourceType resource type e.g. Patient * @param resId Resource id * @param resJson Resource JSON content */ "post": async (resourceType:string, resId: number, resJson: any) => { try { let requestConfig: any = {}; if (config.sync.haveAuthToken) { let headers: axios.AxiosRequestHeaders = { Authorization: `Bearer ${config.sync.token}` } requestConfig["headers"] = headers; } let res = await axios.default.post(`${config.sync.FHIRBaseURL}${resourceType}`, resJson, requestConfig); return res.data; } catch(e) { log.error(e); return false; } } } ``` # index.ts程式主體 ```typescript= import { convertLOBToJson } from './utils/lob'; import { getResources, getResourcesCountByResourceType, getResource } from './utils/res'; import { getSyncedResourceById, addSyncResource, doSync } from './utils/res_sync' import { config } from './config/config'; import { IResource } from './models/resource'; import { IResourceVer } from './models/resource_ver'; /** * Check source FHIR Server content is need to sync to target FHIR Server. * @param resource * @param resourceVer */ async function isNeedSync(resource:IResource): Promise<boolean> { let syncedResource = await getSyncedResourceById(resource.res_id); if (syncedResource.length !== 0 ) { return syncedResource[0].res_ver === resource.res_ver; } return true; } /** * 執行被丟入陣列的promise function * @param workerList */ async function doWorks(workerList: Array<any>) { let doWorkList = []; while(workerList.length > 0) { doWorkList = workerList.slice(0,config.sync.totalWorker); workerList.splice(0,config.sync.totalWorker); await Promise.allSettled(doWorkList.map(f=> f())) } } /** * 新增執行同步資料的function到陣列裡 * @param worker 當前待執行promise function陣列 * @param resourceItemList 當前hfj_resource資料 * @param limit SQL query data limit * @param offset SQL query data offset */ function addWork(worker: Array<any>, resourceItemList: Array<any>, limit: number, offset:number) { worker.push(async ()=> { let successCount = 0; for (let i = 0 ; i < resourceItemList.length ; i++) { let resourceItem = resourceItemList[i]; let resource = await getResource(resourceItem.res_id, resourceItem.res_ver); let syncResourceType = resourceItem.res_type; let needSync = await isNeedSync(resourceItem); if (needSync) { let res_id = resourceItem.res_id; let lobJson = await convertLOBToJson(resource); let syncResData = await doSync[config.sync.method](syncResourceType, res_id, lobJson); if (syncResData) { let addResult = await addSyncResource(syncResourceType, syncResData.id, resource); if (addResult.status) { successCount++; } } } else { successCount++; } } console.log(`convert ${offset}~${offset+limit} successfully ${successCount}/${resourceItemList.length}`); }); } /** * 程式主體main */ (async () => { let worker:Array<any> = []; let limit = config.sync.limit; let offset = 0; //取得第一批hfj_resource資料 let resourceItemList = await getResources(limit , offset); //執行直到pagination hfj_resource資料長度為0 while(resourceItemList.length > 0) { //當待執行promise function陣列長度與設定檔最大併行數量相同時,併行執行所有promise function if (worker.length == config.sync.totalWorker) await doWorks(worker); addWork(worker, resourceItemList, limit, offset); //When offset is 0, limit is 100 that 0~100, after first processing this will be next page, offset is 100, limit is 100 = 100~200 offset += limit; //get next page `hfj_resource` data resourceItemList = await getResources(limit , offset); } //do remain works await doWorks(worker); })(); ``` # 參考資料 - [How do I ungzip (decompress) a NodeJS request's module gzip response body?](https://stackoverflow.com/questions/12148948/how-do-i-ungzip-decompress-a-nodejs-requests-module-gzip-response-body)