# 慈濟大學醫學資訊學系 智慧醫療工作坊 (2023/12/22)
## 智慧物聯網(AIoT)與智慧醫療應用(快速指令表)
講師:歐尼克斯實境互動工作室 許哲豪(Jack Hsu)博士
2023/12/22 整理製作
![FM636A](https://hackmd.io/_uploads/SybtS0JPT.jpg =240x)
課程使用教具:[【旗標─創客‧自造者工作坊─Python×AI 生醫感測健康大應用】](https://www.flag.com.tw/maker/FM636A)(內附教材一本、相關開發板及感測器)、[【相關範例程式】](https://www.flag.com.tw/bk/t/FM636A)
月
**註:本次課程參考原教材第一至六章改編,欲更深入了解的學員可參考原教材及相關範例程式。**
[toc]
## 1. 智慧物聯網與生醫感測器簡介
## 2. 認識微控制器ESP32與開發環境建置
1. 下載 Python 程式開發工具 Thonny
https://thonny.org/
2. 下載USB轉UART連線驅動程式 CH341SER
http://www.wch.cn/downloads/CH341SER_EXE.html
3. 將USB纜線小頭插入開發板,再將大頭插入電腦USB埠,開啟Thonny,點選主選單「執行」-「工具/設定直譯器」,切到「直譯器」子頁面,Thonny直譯器選擇「MicroPython(一般)」,連接埠選擇「USB-Serial CH340(COMx)」,x會隨機產生。
4. 看到Thonny下方「互動環境(Shell)」出現「MicroPython」字樣則完成連線設定。
5. 設定WiFi遠端監看,將開發板變成網頁伺服器,透過瀏覽器監看數值。(下列方式二選一)
* 開發板和筆電都連到同一個區域網路的無線Wi-Fi基地台,再於瀏覽器輸入開發板動態配置到的虛擬網址即可查看資訊。(此方式不適用於學校的公用無線網路)
* 開放個人安卓手機變成無線Wi-Fi基地台,關閉「數據節省模式」,設定「網路和網際網路」-「無線基地台與網路共用」 ,點擊「Wi-Fi無線基地台」,設定「無線基地台名稱」、安全性「WPA2 PSK」及「密碼」,再開啟「無線基地台」,完成後令開發板連線至手機無線Wi-Fi基地台,取得虛擬網址,再以手機的瀏覽器連線即可。若欲使用筆電上的瀏覽器連線,則筆電亦需連線到手機的無線Wi-Fi基地台才能瀏覽。
### 2.1 LED閃爍測試(單行程式)
在互動環境中依序一次一列輸入下列指令即可改變LED亮滅
```python=
from machine import Pin # 引入接腳設定函式庫
led = Pin(5, Pin.OUT) # 設定Pin5為輸出腳
led.value(0) # 輸出低電位,點亮LED
led.value(False) # 點亮LED,功能同上一指令
led.value(1) # 輸出高電位,熄滅LED
led.value(True) # 熄滅LED,功能同上一指令
```
### 2.2 LED閃爍測試(完整程式 \CH02\LAB01.py)
1. 在Thonny編輯區中輸入下列程式,並按工作列「綠色箭頭」按鈕執行程式。
2. 若欲保存程式,可按工作列「磁碟」按鈕,進行儲存。儲存時可選擇本機或MicroPython設備。**儲存前請記得先按「紅色方塊」停止程式執行。**
* 本機:可存放在任意目錄,可任意命名為 xxx.py。
* MicroPython:儲存在開發板中,可任意命名為 xxx.py。若命名為 main.py ,則開發板重置後會自動執行該檔案。
```python=
# 從 machine 模組匯入 Pin 物件
from machine import Pin
# 匯入時間相關的time模組
import time
#建立 5 號腳位的 Pin 物件, 設定為腳位輸出, 命名為 led
led = Pin(5, Pin.OUT)
while True:
led.value(1) # 熄滅LED燈
time.sleep(0.5) # 暫停0.5秒
led.value(0) # 點亮LED燈
time.sleep(0.5) # 暫停0.5秒
```
**練習一:可改變不同閃爍時間及次數,如作出三長三短的SOS求救信號。**
## 3. 膚電反應(GSR)量測實驗
### 3.1 ADC讀值測試(單行程式)
在互動環境中依序一次一列輸入下列指令即可讀取ADC數值(外部電壓)
```python=
# 從 machine 模組匯入 Pin, ADC 物件
from machine import Pin, ADC
adc_pin = Pin(36) # 36是ESP32的VP腳位
adc = ADC(adc_pin) # 設定36為輸入腳位
adc.width(ADC.WIDTH_9BIT) # 設定分辨率位元數(解析度)
adc.atten(ADC.ATTN_11DB) # 設定最大電壓
adc.read() # 讀取ADC數值
```
### 3.2 基礎膚電測試(完整程式)(\CH03\LAB02.py)
每隔0.1秒讀取一次ADC數值(外部電壓)並顯示。
```python=
import time
from machine import Pin, ADC
adc_pin = Pin(36) # 36是ESP32的VP腳位
adc = ADC(adc_pin) # 設定36為輸入腳位
adc.width(ADC.WIDTH_12BIT) # 設定分辨率位元數(解析度)
adc.atten(ADC.ATTN_11DB) # 設定最大電壓
while True: # 設定無限迴圈,反覆讀值
gsr = adc.read()
print(gsr)
time.sleep(0.1)
```
### 3.3 無線介面測謊器測試(完整程式)(\CH03\LAB03.py)
程式執行前請先上傳下列模組程式到MicroPython設備中。
* ESPWebServer.py 網頁伺服器模組
* index.html 監看數值網頁(\CH03\上傳資料)
修改程式中無線基地台名稱SSID及密碼password
sta.connect("SSID", "password")
根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時數值變化。
```python=
# 匯入 utime 模組用以計時
from utime import ticks_ms, ticks_diff
from machine import Pin, ADC
import network, ESPWebServer
adc_pin = Pin(36) # 36是ESP32的VP腳位
adc = ADC(adc_pin) # 設定36為輸入腳位
adc.width(ADC.WIDTH_9BIT) # 設定分辨率位元數(解析度)
adc.atten(ADC.ATTN_11DB) # 設定最大電壓
angle = 180 # 膚電反應轉換後角度
def SendAngle(socket, args): # 處理 /lie 指令的函式
ESPWebServer.ok(socket, "200", str(angle))
# 將膚電反應對應到180~360的函式
def gsr_to_angle(raw_val, min_val, max_val):
raw_val *= -1
new_val = ((raw_val + max_val)
/(max_val - min_val)*(360 - 180) + 180)
return new_val
print("連接中...")
sta = network.WLAN(network.STA_IF)
sta.active(True)
# 自行變更無線基地台名稱SSID及密碼password
sta.connect("SSID", "password")
while not sta.isconnected():
pass
print("已連接, ip為:", sta.ifconfig()[0])
ESPWebServer.begin(80) # 啟用網站
ESPWebServer.onPath("/lie", SendAngle) # 指定處理指令的函式
time_mark = ticks_ms() # 取得當前時間
while True:
# 持續檢查是否收到新指令
ESPWebServer.handleClient()
# 當計時器變數與現在的時間差小於 100 則執行任務
if ticks_diff(ticks_ms(), time_mark) > 100:
gsr = adc.read()
angle = gsr_to_angle(gsr, 400, 511)
time_mark = ticks_ms() # 重置計時器
# 讀取ADC數值
```
## 4. 血氧濃度(SpO2)量測實驗
### 4.1 血氧濃度讀取(直接顯示)(\CH04\LAB04.py)
食指先置於感測器上再按快捷列「執行」鈕,待15秒後開始會有讀值顯示在互動區,當無讀值時不顯示。可嚐試吐氣後停止呼吸一段時間來觀察差異。
```python=
from machine import SoftI2C, Pin
from max30102 import MAX30102
from pulse_oximeter import Pulse_oximeter
my_SCL_pin = 25 # I2C SCL 腳位
my_SDA_pin = 26 # I2C SDA 腳位
i2c = SoftI2C(sda=Pin(my_SDA_pin),
scl=Pin(my_SCL_pin))
sensor = MAX30102(i2c=i2c)
sensor.setup_sensor()
pox = Pulse_oximeter(sensor) # 使用血氧濃度計算類別
count = 0
while True:
pox.update()
spo2 = pox.get_spo2()
count = count+1
if spo2 > 0:
print("SpO2:", spo2, "%")
# else:
# print(count)
```
### 4.2 血氧濃度讀取(網頁顯示)(\CH04\LAB05.py)
程式執行前請先上傳下列模組程式到MicroPython設備中。
* ESPWebServer.py 網頁伺服器模組
* max30102.py 血氧感測模組
* pulse_oximeter.py 血氧計算函式
* circular_buffer.py 血氧運算工具
* index.html 監看數值網頁(\CH04\上傳資料)
修改程式中無線基地台名稱SSID及密碼password
sta.connect("SSID", "password")
根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時數值變化。
```python=
# 匯入 utime 模組用以計時
from utime import ticks_ms, ticks_diff
from machine import SoftI2C, Pin
import network, ESPWebServer
from max30102 import MAX30102
from pulse_oximeter import Pulse_oximeter
my_SCL_pin = 25 # I2C SCL 腳位
my_SDA_pin = 26 # I2C SDA 腳位
i2c = SoftI2C(sda=Pin(my_SDA_pin),
scl=Pin(my_SCL_pin))
sensor = MAX30102(i2c=i2c)
sensor.setup_sensor()
pox = Pulse_oximeter(sensor) # 使用血氧濃度計算類別
spo2 = 0
def SendSpo2(socket, args): # 處理 /handleCmd 指令的函式
ESPWebServer.ok(socket, "200", str(spo2))
print("連接中...")
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect("SSID", "password")
while not sta.isconnected():
pass
print("已連接, ip為:", sta.ifconfig()[0])
ESPWebServer.begin(80) # 啟用網站
ESPWebServer.onPath("/measure", SendSpo2)
time_mark = ticks_ms()
while True:
ESPWebServer.handleClient()
pox.update()
spo2_tmp = pox.get_spo2()
spo2_tmp = round(spo2_tmp, 1)
if spo2_tmp > 0:
time_mark = ticks_ms()
spo2 = spo2_tmp
print("SpO2:", spo2, "%")
if ticks_diff(ticks_ms(), time_mark) > 5000:
spo2 = 0
```
## 5. 脈博心律(PPG)量測實驗
### 5.1 心率讀取(直接顯示)(\CH05\LAB06.py)
食指先置於感測器上再按快捷列「執行」鈕,待3~5秒後濾波器取得穩定直流值後,互動區開始會有讀值顯示,當無讀值時不顯示。
```python=
from machine import SoftI2C, Pin
from max30102 import MAX30102
from pulse_oximeter import Pulse_oximeter, IIR_filter
my_SCL_pin = 25 # I2C SCL 腳位
my_SDA_pin = 26 # I2C SDA 腳位
i2c = SoftI2C(sda=Pin(my_SDA_pin),
scl=Pin(my_SCL_pin))
sensor = MAX30102(i2c=i2c)
sensor.setup_sensor()
pox = Pulse_oximeter(sensor)
dc_extractor = IIR_filter(0.99) # 用於提取直流成份
while True:
pox.update() # 更新血氧模組
if pox.available():
red_val = pox.get_raw_red()
red_dc = dc_extractor.step(red_val)
ppg = int(red_dc*1.01 - red_val)
print(ppg)
```
### 5.2 心率讀取(網頁顯示)(\CH05\LAB07.py)
程式執行前請先上傳下列模組程式到MicroPython設備中。此步驟和4.2範例相同,若已執行過則前4個檔案可略過,第5步驟仍需執行。
* ESPWebServer.py 網頁伺服器模組
* max30102.py 血氧感測模組
* pulse_oximeter.py 血氧計算函式
* circular_buffer.py 血氧運算工具
* index.html 監看數值網頁(\CH05\上傳資料)
修改程式中無線基地台名稱SSID及密碼password
sta.connect("SSID", "password")
根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時波形(數值)變化。
```python=
import _thread
from utime import ticks_ms, ticks_diff
from machine import SoftI2C, Pin
import network, ESPWebServer
from max30102 import MAX30102
from pulse_oximeter import Pulse_oximeter, IIR_filter
led = Pin(5, Pin.OUT)
led.value(1)
my_SCL_pin = 25 # I2C SCL 腳位
my_SDA_pin = 26 # I2C SDA 腳位
i2c = SoftI2C(sda=Pin(my_SDA_pin),
scl=Pin(my_SCL_pin))
sensor = MAX30102(i2c=i2c)
sensor.setup_sensor()
pox = Pulse_oximeter(sensor)
dc_extractor = IIR_filter(0.99) # 用於提取直流成分
thresh_generator = IIR_filter(0.9) # 用於產生動態閾值
is_beating = False # 紀錄是否正在跳動的旗標
beat_time_mark = ticks_ms() # 紀錄心跳時間點
heart_rate = 0
num_beats = 0 # 紀錄心跳次數
target_n_beats = 3 # 設定要幾次心跳才更新一次心率
tot_intval = 0 # 紀錄心跳時間區間
ppg = 0
def cal_heart_rate(intval, target_n_beats=3):
intval /= 1000
heart_rate = target_n_beats/(intval/60)
heart_rate = round(heart_rate, 1)
return heart_rate
def SendHrRate(socket, args): # 處理 /hr 指令的函式
ESPWebServer.ok(socket, "200", str(heart_rate))
def SendEcg(socket, args): # 處理 /line 指令的函式
ESPWebServer.ok(socket, "200", str(ppg))
def web_thread(): # 處理網頁的子執行緒函式
while True:
ESPWebServer.handleClient()
print("連接中...")
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect("SSID", "password")
while not sta.isconnected():
pass
print("已連接, ip為:", sta.ifconfig()[0])
ESPWebServer.begin(80) # 啟用網站
ESPWebServer.onPath("/hr", SendHrRate) # 指定處理指令的函式
ESPWebServer.onPath("/line", SendEcg) # 指定處理指令的函式
_thread.start_new_thread(web_thread, ()) # 啟動子執行緒
while True: # 主執行緒
pox.update() # 更新血氧模組
if pox.available():
red_val = pox.get_raw_red()
red_dc = dc_extractor.step(red_val)
ppg = max(int(red_dc*1.01 - red_val), 0)
thresh = thresh_generator.step(ppg)
if ppg > (thresh + 20) and not is_beating:
is_beating = True
led.value(0)
intval = ticks_diff(ticks_ms(), beat_time_mark)
if 2000 > intval > 270:
tot_intval += intval
num_beats += 1
if num_beats == target_n_beats:
heart_rate = cal_heart_rate(
tot_intval, target_n_beats)
print(heart_rate)
tot_intval = 0
num_beats = 0
else:
tot_intval = 0
num_beats = 0
beat_time_mark = ticks_ms()
elif ppg < thresh:
is_beating = False
led.value(1)
```
## 6. 心電訊號(ECG)量測實驗
### 6.1 心電訊號讀取(直接顯示)(\CH06\LAB08.py)
左手貼黃色電極片,右手貼紅色,左腳貼綠色,每50ms送出讀值最大內容,互動區當無讀值時不顯示。
```python=
from utime import ticks_ms, ticks_diff
from machine import Pin, ADC
adc_pin = Pin(36) # 36是ESP32的VP腳位
adc = ADC(adc_pin) # 設定36為輸入腳位
adc.width(ADC.WIDTH_10BIT) # 設定分辨率位元數(解析度)
adc.atten(ADC.ATTN_11DB) # 設定最大電壓
max_val = 0 # 用來紀錄最大值
time_mark = ticks_ms() # 取得當前時間
while True:
raw_val = adc.read()
if raw_val > max_val:
max_val = raw_val
if ticks_diff(ticks_ms(), time_mark) > 50:
ecg = max_val
print(ecg)
max_val = 0
time_mark = ticks_ms() # 重置計時器
```
### 6.2 心電訊號讀取(網頁顯示)(\CH06\LAB09.py)
程式執行前請先上傳下列模組程式到MicroPython設備中。此步驟和4.2範例相同,若已執行過則前4個檔案可略過,第5步驟仍需執行。
* ESPWebServer.py 網頁伺服器模組
* max30102.py 血氧感測模組
* pulse_oximeter.py 血氧計算函式
* circular_buffer.py 血氧運算工具
* index.html 監看數值網頁(\CH06\上傳資料)
修改程式中無線基地台名稱SSID及密碼password
sta.connect("SSID", "password")
根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時波形(數值)變化。
```python=
import _thread
from utime import ticks_ms, ticks_diff
from machine import Pin, ADC
import network, ESPWebServer
from pulse_oximeter import IIR_filter
buzzer = Pin(2, Pin.OUT)
adc_pin = Pin(36) # 36是ESP32的VP腳位
adc = ADC(adc_pin) # 設定36為輸入腳位
adc.width(ADC.WIDTH_10BIT) # 設定分辨率位元數(解析度)
adc.atten(ADC.ATTN_11DB) # 設定最大電壓
thresh_generator = IIR_filter(0.9) # 用於產生動態閾值
is_beating = False # 紀錄是否正在跳動的旗標
beat_time_mark = ticks_ms() # 紀錄心跳時間點
heart_rate = 0
num_beats = 0 # 紀錄心跳次數
target_n_beats = 3 # 設定要幾次心跳才更新一次心率
tot_intval = 0 # 紀錄心跳時間區間
max_val = 0
ecg = 0
def cal_heart_rate(intval, target_n_beats=3):
intval /= 1000
heart_rate = target_n_beats/(intval/60)
heart_rate = round(heart_rate, 1)
return heart_rate
def SendHrRate(socket, args): # 處理 /hr 指令的函式
ESPWebServer.ok(socket, "200", str(heart_rate))
def SendEcg(socket, args): # 處理 /line 指令的函式
ESPWebServer.ok(socket, "200", str(ecg))
def web_thread():
while True:
ESPWebServer.handleClient()
print("連接中...")
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.connect("SSID", "password")
while not sta.isconnected():
pass
print("已連接, ip為:", sta.ifconfig()[0])
ESPWebServer.begin(80) # 啟用網站
ESPWebServer.onPath("/hr", SendHrRate) # 指定處理指令的函式
ESPWebServer.onPath("/line", SendEcg) # 指定處理指令的函式
_thread.start_new_thread(web_thread, ())
time_mark = ticks_ms()
while True:
raw_val = adc.read()
if raw_val > max_val:
max_val = raw_val
if ticks_diff(ticks_ms(), time_mark) > 50:
ecg = max_val
thresh = thresh_generator.step(ecg)
if ecg > (thresh + 100) and not is_beating:
is_beating = True
buzzer.value(1)
intval = ticks_diff(ticks_ms(), beat_time_mark)
if 2000 > intval > 270:
tot_intval += intval
num_beats += 1
if num_beats == target_n_beats:
heart_rate = cal_heart_rate(
tot_intval, target_n_beats)
print(heart_rate)
tot_intval = 0
num_beats = 0
else:
tot_intval = 0
num_beats = 0
beat_time_mark = ticks_ms()
elif ecg < thresh:
is_beating = False
buzzer.value(0)
max_val = 0
time_mark = ticks_ms()
```
## 附錄:外部模組
### circular_buffer.py
```python=
from ucollections import deque
class CircularBuffer(object):
''' Very simple implementation of a circular buffer based on deque '''
def __init__(self, max_size):
self.data = deque((), max_size, True)
self.max_size = max_size
def __len__(self):
return len(self.data)
def is_empty(self):
return not bool(self.data)
def append(self, item):
try:
self.data.append(item)
except IndexError:
# deque full, popping 1st item out
self.data.popleft()
self.data.append(item)
def pop(self):
return self.data.popleft()
def clear(self):
self.data = deque((), self.max_size, True)
def pop_head(self):
buffer_size = len(self.data)
temp = self.data
if buffer_size == 1:
pass
elif buffer_size > 1:
self.data.clear()
for x in range(buffer_size - 1):
self.data = temp.popleft()
else:
return 0
return temp.popleft()
```
### ESPWebServer.py
```python=
"""A simple HTTP server that only accept GET request
It adopt the programming style of ESP8266WebServer
library in ESP8266 Arduino Core
"""
import network
import machine
import socket
import uselect
import os
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Use for checking a new client connection
poller = uselect.poll()
# Dict for registed handlers of all paths
handlers = {}
# Function of handler for request not found
notFoundHandler = None
# The path to the web documents on MicroPython filesystem
docPath = "/"
# Data for template
tplData = {}
def begin(port):
"""Function to start http server
"""
global server, poller
server.bind(('0.0.0.0', port))
server.listen(1)
# Register for checking new client connection
poller.register(server, uselect.POLLIN)
def close():
"""Function to stop http server
"""
poller.unregister(server)
server.close()
def handleClient():
"""Check for new client connection and process the request
"""
global server, poller
# Note:don't call poll() with 0, that would randomly cause
# reset with "Fatal exception 28(LoadProhibitedCause)" message
res = poller.poll(1)
if res: # There's a new client connection
(socket, sockaddr) = server.accept()
handle(socket)
socket.close()
def __sendPage(socket, filePath):
"""Send the file as webpage to client
"""
try:
f = open(filePath, "rb")
while True:
data = f.read(64)
if (data == b""):
break
socket.write(data)
f.close()
except Exception as e:
print(e)
def err(socket, code, message):
"""Respong error meesage to client
"""
socket.write("HTTP/1.1 " + code + " " + message + "\r\n\r\n")
socket.write("<h1>" + message + "</h1>")
def ok(socket, code, msg):
"""Response successful message or webpage to client
"""
socket.write("HTTP/1.1 " + code + " OK\r\n\r\n")
if __fileExist(msg):
filePath = msg
__sendPage(socket, filePath)
else:
socket.write(msg)
def __fileExist(path):
"""Check for file existence
"""
# print(path)
try:
stat = os.stat(path)
# stat[0] bit 15 / 14 -> file/dir
if stat[0] & 0x8000 == 0x8000: # file
# print("Found.")
return True
else: # Dir
return False
except:
# print("Not Found.")
return False
def handle(socket):
"""Processing new GET request
"""
global docPath, handlers
currLine = str(socket.readline(), 'utf-8')
request = currLine.split(" ")
if len(request) != 3: # Discarded if it's a bad header
return
(method, url, version) = request
if "?" in url: # Check if there's query string?
(path, query) = url.split("?", 2)
else:
(path, query) = (url, "")
args = {}
if query: # Parsing the querying string
argPairs = query.split("&")
for argPair in argPairs:
arg = argPair.split("=")
args[arg[0]] = arg[1]
while True: # Read until blank line after header
header = socket.readline()
if header == b"":
return
if header == b"\r\n":
break
# Check for supported HTTP version
if version != "HTTP/1.0\r\n" and version != "HTTP/1.1\r\n":
err(socket, "505", "Version Not Supported")
elif method != "GET": # Only accept GET request
err(socket, "501", "Not Implemented")
elif path in handlers: # Check for registered path
handlers[path](socket, args)
#elif not path.startswith(docPath): # Check for path to any document
# err(socket, "400", "Bad Request")
else:
filePath = path
# find the file
if not __fileExist(filePath):
filePath = path + ("index.html" if path.endswith("/") else "/index.html")
# find index.html in the path
if not __fileExist(filePath):
filePath = path + ("index.p.html" if path.endswith("/") else "/index.p.html")
# find index.p.html in the path
if not __fileExist(filePath):
if notFoundHandler:
notFoundHandler(socket)
else:
err(socket, "404", "Not Found")
return
# Responds the header first
socket.write("HTTP/1.1 200 OK\r\n\r\n")
# Responds the file content
if filePath.endswith(".p.html"):
# print("template file.")
f = open(filePath, "r")
for l in f:
socket.write(l.format(**tplData))
f.close()
else:
__sendPage(socket, filePath)
def onPath(path, handler):
"""Register handler for processing request of specified path
"""
global handlers
handlers[path] = handler
def onNotFound(handler):
"""Register handler for processing request of not found path
"""
global notFoundHandler
notFoundHandler = handler
def setDocPath(path):
"""Set the path to documents' directory
"""
global docPath
docPath = path
def setTplData(data):
"""Set data for template
"""
global tplData
tplData = data
```
### max30102.py
```python=
# This work is a lot based on:
# - https://github.com/sparkfun/SparkFun_MAX3010x_Sensor_Library
# Written by Peter Jansen and Nathan Seidle (SparkFun)
# This is a library written for the Maxim MAX30102 Optical Smoke Detector
# It should also work with the MAX30102, which has a Green LED, too.
# These sensors use I2C to communicate, as well as a single (optional)
# interrupt line that is not currently supported in this driver.
# Written by Peter Jansen and Nathan Seidle (SparkFun)
# BSD license, all text above must be included in any redistribution.
#
# - https://github.com/kandizzy/esp32-micropython/blob/master/PPG/ppg/MAX30102.py
# A port of the library to MicroPython by kandizzy
#
# This driver aims at giving almost full access to Maxim MAX30102 functionalities.
# n-elia
import uerrno
from machine import SoftI2C
from ustruct import unpack
from utime import sleep_ms, ticks_diff, ticks_ms
from circular_buffer import CircularBuffer
# I2C address (7-bit address)
MAX3010X_I2C_ADDRESS = 0x57 # Right-shift of 0xAE, 0xAF
# Status Registers
MAX30102_INT_STAT_1 = 0x00
MAX30102_INT_STAT_2 = 0x01
MAX30102_INT_ENABLE_1 = 0x02
MAX30102_INT_ENABLE_2 = 0x03
# FIFO Registers
MAX30102_FIFO_WRITE_PTR = 0x04
MAX30102_FIFO_OVERFLOW = 0x05
MAX30102_FIFO_READ_PTR = 0x06
MAX30102_FIFO_DATA = 0x07
# Configuration Registers
MAX30102_FIFO_CONFIG = 0x08
MAX30102_MODE_CONFIG = 0x09
MAX30102_PARTICLE_CONFIG = 0x0A # Sometimes listed as 'SPO2' in datasheet (pag.11)
MAX30102_LED1_PULSE_AMP = 0x0C # IR
MAX30102_LED2_PULSE_AMP = 0x0D # RED
MAX30102_LED_PROX_AMP = 0x10
MAX30102_MULTI_LED_CONFIG_1 = 0x11
MAX30102_MULTI_LED_CONFIG_2 = 0x12
# Die Temperature Registers
MAX30102_DIE_TEMP_INT = 0x1F
MAX30102_DIE_TEMP_FRAC = 0x20
MAX30102_DIE_TEMP_CONFIG = 0x21
# Proximity Function Registers
MAX30102_PROX_INT_THRESH = 0x30
# Part ID Registers
MAX30102_REVISION_ID = 0xFE
MAX30102_PART_ID = 0xFF # Should always be 0x15. Identical for MAX30102.
# MAX30102 Commands
# Interrupt configuration (datasheet pag 13, 14)
MAX30102_INT_A_FULL_MASK = ~0b10000000
MAX30102_INT_A_FULL_ENABLE = 0x80
MAX30102_INT_A_FULL_DISABLE = 0x00
MAX30102_INT_DATA_RDY_MASK = ~0b01000000
MAX30102_INT_DATA_RDY_ENABLE = 0x40
MAX30102_INT_DATA_RDY_DISABLE = 0x00
MAX30102_INT_ALC_OVF_MASK = ~0b00100000
MAX30102_INT_ALC_OVF_ENABLE = 0x20
MAX30102_INT_ALC_OVF_DISABLE = 0x00
MAX30102_INT_PROX_INT_MASK = ~0b00010000
MAX30102_INT_PROX_INT_ENABLE = 0x10
MAX30102_INT_PROX_INT_DISABLE = 0x00
MAX30102_INT_DIE_TEMP_RDY_MASK = ~0b00000010
MAX30102_INT_DIE_TEMP_RDY_ENABLE = 0x02
MAX30102_INT_DIE_TEMP_RDY_DISABLE = 0x00
# FIFO data queue configuration
MAX30102_SAMPLE_AVG_MASK = ~0b11100000
MAX30102_SAMPLE_AVG_1 = 0x00
MAX30102_SAMPLE_AVG_2 = 0x20
MAX30102_SAMPLE_AVG_4 = 0x40
MAX30102_SAMPLE_AVG_8 = 0x60
MAX30102_SAMPLE_AVG_16 = 0x80
MAX30102_SAMPLE_AVG_32 = 0xA0
MAX30102_ROLLOVER_MASK = 0xEF
MAX30102_ROLLOVER_ENABLE = 0x10
MAX30102_ROLLOVER_DISABLE = 0x00
# Mask for 'almost full' interrupt (defaults to 32 samples)
MAX30102_A_FULL_MASK = 0xF0
# Mode configuration commands (page 19)
MAX30102_SHUTDOWN_MASK = 0x7F
MAX30102_SHUTDOWN = 0x80
MAX30102_WAKEUP = 0x00
MAX30102_RESET_MASK = 0xBF
MAX30102_RESET = 0x40
MAX30102_MODE_MASK = 0xF8
MAX30102_MODE_IR_ONLY = 0x02
MAX30102_MODE_IR_RED = 0x03
MAX30102_MODE_MULTI_LED = 0x07
# Particle sensing configuration commands (pgs 19-20)
MAX30102_ADC_RANGE_MASK = 0x9F
MAX30102_ADC_RANGE_2048 = 0x00
MAX30102_ADC_RANGE_4096 = 0x20
MAX30102_ADC_RANGE_8192 = 0x40
MAX30102_ADC_RANGE_16384 = 0x60
MAX30102_SAMPLERATE_MASK = 0xE3
MAX30102_SAMPLERATE_50 = 0x00
MAX30102_SAMPLERATE_100 = 0x04
MAX30102_SAMPLERATE_200 = 0x08
MAX30102_SAMPLERATE_400 = 0x0C
MAX30102_SAMPLERATE_800 = 0x10
MAX30102_SAMPLERATE_1000 = 0x14
MAX30102_SAMPLERATE_1600 = 0x18
MAX30102_SAMPLERATE_3200 = 0x1C
MAX30102_PULSE_WIDTH_MASK = 0xFC
MAX30102_PULSE_WIDTH_69 = 0x00
MAX30102_PULSE_WIDTH_118 = 0x01
MAX30102_PULSE_WIDTH_215 = 0x02
MAX30102_PULSE_WIDTH_411 = 0x03
# LED brightness level. It affects the distance of detection.
MAX30102_PULSE_AMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch
MAX30102_PULSE_AMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch
MAX30102_PULSE_AMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch
MAX30102_PULSE_AMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch
# Multi-LED Mode configuration (datasheet pag 22)
MAX30102_SLOT1_MASK = 0xF8
MAX30102_SLOT2_MASK = 0x8F
MAX30102_SLOT3_MASK = 0xF8
MAX30102_SLOT4_MASK = 0x8F
SLOT_NONE = 0x00
SLOT_IR_LED = 0x01
SLOT_RED_LED = 0x02
SLOT_NONE_PILOT = 0x04
SLOT_IR_PILOT = 0x05
SLOT_RED_PILOT = 0x06
MAX30102_EXPECTED_PART_ID = 0x15
TAG = 'MAX30102'
# Size of the queued readings
STORAGE_QUEUE_SIZE = 4
# Data structure to hold the last readings
class SensorData:
def __init__(self):
self.ir = CircularBuffer(STORAGE_QUEUE_SIZE)
self.red = CircularBuffer(STORAGE_QUEUE_SIZE)
self.green = CircularBuffer(STORAGE_QUEUE_SIZE)
# Sensor class
class MAX30102(object):
def __init__(self,
i2c: SoftI2C,
i2c_hex_address=MAX3010X_I2C_ADDRESS,
):
self._address = i2c_hex_address
self._i2c = i2c
self._active_leds = None
self._pulse_width = None
self._multi_led_read_mode = None
# Store current config values to compute acquisition frequency
self._sample_rate = None
self._sample_avg = None
self._acq_frequency = None
self._acq_frequency_inv = None
# Circular buffer of readings from the sensor
self.sense = SensorData()
try:
# self._i2c.readfrom(self._address, 1) # Some boards won't work if scan() is never called
dev_list = self._i2c.scan()
if not self._address in dev_list:
raise OSError(uerrno.ENODEV)
except OSError as error:
if error.errno == uerrno.ENODEV:
raise RuntimeError("Sensor not found on I2C bus.")
else:
raise RuntimeError("Error while reading from I2C bus: OSError code " + str(error.errno))
if not (self.check_part_id()):
raise RuntimeError("I2C device ID not corresponding to MAX30102 or MAX30102")
# Sensor setup method
def setup_sensor(self, led_mode=2, adc_range=16384, sample_rate=400,
led_power=MAX30102_PULSE_AMP_MEDIUM, sample_avg=8,
pulse_width=411):
# Reset the sensor's registers from previous configurations
self.soft_reset()
# Set the number of samples to be averaged by the chip to 8
self.set_fifo_average(sample_avg)
# Allow FIFO queues to wrap/roll over
self.enable_fifo_rollover()
# Set the LED mode to the default value of 2 (IR + RED)
# Note: the 3rd mode is available only with MAX30102
self.set_led_mode(led_mode)
# Set the ADC range to default value of 16384
self.set_adc_range(adc_range)
# Set the sample rate to the default value of 400
self.set_sample_rate(sample_rate)
# Set the Pulse Width to the default value of 411
self.set_pulse_width(pulse_width)
# Set the LED brightness to the default value of 'low'
self.set_pulse_amplitude_ir(led_power)
self.set_pulse_amplitude_red(led_power)
self.set_pulse_amplitude_proximity(led_power)
# Clears the FIFO
self.clear_fifo()
def __del__(self):
self.shutdown()
# Methods to read the two interrupt flags
def get_int_1(self):
# Load the Interrupt 1 status (configurable) from the register
rev_id = self.i2c_read_register(MAX30102_INT_STAT_1)
return rev_id
def get_int_2(self):
# Load the Interrupt 2 status (DIE_TEMP_DRY) from the register
rev_id = self.i2c_read_register(MAX30102_INT_STAT_2)
return rev_id
# Methods to set up the interrupt flags
def enable_a_full(self):
# Enable the almost full interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_A_FULL_MASK, MAX30102_INT_A_FULL_ENABLE)
def disable_a_full(self):
# Disable the almost full interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_A_FULL_MASK, MAX30102_INT_A_FULL_DISABLE)
def enable_data_rdy(self):
# Enable the new FIFO data ready interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_DATA_RDY_MASK, MAX30102_INT_DATA_RDY_ENABLE)
def disable_data_rdy(self):
# Disable the new FIFO data ready interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_DATA_RDY_MASK, MAX30102_INT_DATA_RDY_DISABLE)
def enable_alc_ovf(self):
# Enable the ambient light limit interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_ALC_OVF_MASK, MAX30102_INT_ALC_OVF_ENABLE)
def disable_alc_ovf(self):
# Disable the ambient light limit interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_ALC_OVF_MASK, MAX30102_INT_ALC_OVF_DISABLE)
def enable_prox_int(self):
# Enable the proximity interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_PROX_INT_MASK, MAX30102_INT_PROX_INT_ENABLE)
def disable_prox_int(self):
# Disable the proximity interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_PROX_INT_MASK, MAX30102_INT_PROX_INT_DISABLE)
def enable_die_temp_rdy(self):
# Enable the die temp. conversion finish interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_2, MAX30102_INT_DIE_TEMP_RDY_MASK, MAX30102_INT_DIE_TEMP_RDY_ENABLE)
def disable_die_temp_rdy(self):
# Disable the die temp. conversion finish interrupt (datasheet pag. 13)
self.bitmask(MAX30102_INT_ENABLE_2, MAX30102_INT_DIE_TEMP_RDY_MASK, MAX30102_INT_DIE_TEMP_RDY_DISABLE)
# Configuration reset
def soft_reset(self):
# When the RESET bit is set to one, all configuration, threshold,
# and data registers are reset to their power-on-state through
# a power-on reset. The RESET bit is cleared automatically back to zero
# after the reset sequence is completed. (datasheet pag. 19)
self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_RESET_MASK, MAX30102_RESET)
curr_status = -1
while not ((curr_status & MAX30102_RESET) == 0):
sleep_ms(10)
curr_status = ord(self.i2c_read_register(MAX30102_MODE_CONFIG))
# Power states methods
def shutdown(self):
# Put IC into low power mode (datasheet pg. 19)
# During shutdown the IC will continue to respond to I2C commands but
# will not update with or take new readings (such as temperature).
self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_SHUTDOWN_MASK, MAX30102_SHUTDOWN)
def wakeup(self):
# Pull IC out of low power mode (datasheet pg. 19)
self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_SHUTDOWN_MASK, MAX30102_WAKEUP)
# LED Configuration
def set_led_mode(self, LED_mode):
# Set LED mode: select which LEDs are used for sampling
# Options: IR only, IR + RED (datasheet pag. 19)
if LED_mode == 1:
self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_MODE_MASK, MAX30102_MODE_IR_ONLY)
elif LED_mode == 2:
self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_MODE_MASK, MAX30102_MODE_IR_RED)
else:
raise ValueError('Wrong LED mode:{0}!'.format(LED_mode))
# Multi-LED Mode Configuration: enable the reading of the LEDs
# depending on the chosen mode
self.enable_slot(1, SLOT_IR_LED)
if LED_mode > 1:
self.enable_slot(2, SLOT_RED_LED)
# Store the LED mode used to control how many bytes to read from
# FIFO buffer in multiLED mode: a sample is made of 3 bytes
self._active_leds = LED_mode
self._multi_led_read_mode = LED_mode * 3
# ADC Configuration
def set_adc_range(self, ADC_range):
# ADC range: set the range of the conversion
# Options: 2048, 4096, 8192, 16384
# Current draw: 7.81pA. 15.63pA, 31.25pA, 62.5pA per LSB.
if ADC_range == 2048:
r = MAX30102_ADC_RANGE_2048
elif ADC_range == 4096:
r = MAX30102_ADC_RANGE_4096
elif ADC_range == 8192:
r = MAX30102_ADC_RANGE_8192
elif ADC_range == 16384:
r = MAX30102_ADC_RANGE_16384
else:
raise ValueError('Wrong ADC range:{0}!'.format(ADC_range))
self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_ADC_RANGE_MASK, r)
# Sample Rate Configuration
def set_sample_rate(self, sample_rate):
# Sample rate: select the number of samples taken per second.
# Options: 50, 100, 200, 400, 800, 1000, 1600, 3200
# Note: in theory, the resulting acquisition frequency for the end user
# is sampleRate/sampleAverage. However, it is worth testing it before
# assuming that the sensor can effectively sustain that frequency
# given its configuration.
if sample_rate == 50:
sr = MAX30102_SAMPLERATE_50
elif sample_rate == 100:
sr = MAX30102_SAMPLERATE_100
elif sample_rate == 200:
sr = MAX30102_SAMPLERATE_200
elif sample_rate == 400:
sr = MAX30102_SAMPLERATE_400
elif sample_rate == 800:
sr = MAX30102_SAMPLERATE_800
elif sample_rate == 1000:
sr = MAX30102_SAMPLERATE_1000
elif sample_rate == 1600:
sr = MAX30102_SAMPLERATE_1600
elif sample_rate == 3200:
sr = MAX30102_SAMPLERATE_3200
else:
raise ValueError('Wrong sample rate:{0}!'.format(sample_rate))
self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_SAMPLERATE_MASK, sr)
# Store the sample rate and recompute the acq. freq.
self._sample_rate = sample_rate
self.update_acquisition_frequency()
# Pulse width Configuration
def set_pulse_width(self, pulse_width):
# Pulse width of LEDs: The longer the pulse width the longer range of
# detection. At 69us and 0.4mA it's about 2 inches,
# at 411us and 0.4mA it's about 6 inches.
if pulse_width == 69:
pw = MAX30102_PULSE_WIDTH_69
elif pulse_width == 118:
pw = MAX30102_PULSE_WIDTH_118
elif pulse_width == 215:
pw = MAX30102_PULSE_WIDTH_215
elif pulse_width == 411:
pw = MAX30102_PULSE_WIDTH_411
else:
raise ValueError('Wrong pulse width:{0}!'.format(pulse_width))
self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_PULSE_WIDTH_MASK, pw)
# Store the pulse width
self._pulse_width = pw
# LED Pulse Amplitude Configuration methods
def set_active_leds_amplitude(self, amplitude):
if self._active_leds > 0:
self.set_pulse_amplitude_ir(amplitude)
if self._active_leds > 1:
self.set_pulse_amplitude_red(amplitude)
def set_pulse_amplitude_ir(self, amplitude):
self.i2c_set_register(MAX30102_LED1_PULSE_AMP, amplitude)
def set_pulse_amplitude_red(self, amplitude):
self.i2c_set_register(MAX30102_LED2_PULSE_AMP, amplitude)
def set_pulse_amplitude_proximity(self, amplitude):
self.i2c_set_register(MAX30102_LED_PROX_AMP, amplitude)
def set_proximity_threshold(self, thresh_msb):
# Set the IR ADC count that will trigger the beginning of particle-
# sensing mode.The threshMSB signifies only the 8 most significant-bits
# of the ADC count. (datasheet page 24)
self.i2c_set_register(MAX30102_PROX_INT_THRESH, thresh_msb)
# FIFO averaged samples number Configuration
def set_fifo_average(self, number_of_samples):
# FIFO sample avg: set the number of samples to be averaged by the chip.
# Options: MAX30102_SAMPLE_AVG_1, 2, 4, 8, 16, 32
if number_of_samples == 1:
ns = MAX30102_SAMPLE_AVG_1
elif number_of_samples == 2:
ns = MAX30102_SAMPLE_AVG_2
elif number_of_samples == 4:
ns = MAX30102_SAMPLE_AVG_4
elif number_of_samples == 8:
ns = MAX30102_SAMPLE_AVG_8
elif number_of_samples == 16:
ns = MAX30102_SAMPLE_AVG_16
elif number_of_samples == 32:
ns = MAX30102_SAMPLE_AVG_32
else:
raise ValueError(
'Wrong number of samples:{0}!'.format(number_of_samples))
self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_SAMPLE_AVG_MASK, ns)
# Store the number of averaged samples and recompute the acq. freq.
self._sample_avg = number_of_samples
self.update_acquisition_frequency()
def update_acquisition_frequency(self):
if None in [self._sample_rate, self._sample_avg]:
return
else:
self._acq_frequency = self._sample_rate / self._sample_avg
from math import ceil
# Compute the time interval to wait before taking a good measure
# (see note in setSampleRate() method)
self._acq_frequency_inv = int(ceil(1000 / self._acq_frequency))
def get_acquisition_frequency(self):
return self._acq_frequency
def clear_fifo(self):
# Resets all points to start in a known state
# Datasheet page 15 recommends clearing FIFO before beginning a read
self.i2c_set_register(MAX30102_FIFO_WRITE_PTR, 0)
self.i2c_set_register(MAX30102_FIFO_OVERFLOW, 0)
self.i2c_set_register(MAX30102_FIFO_READ_PTR, 0)
def enable_fifo_rollover(self):
# FIFO rollover: enable to allow FIFO tro wrap/roll over
self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_ROLLOVER_MASK, MAX30102_ROLLOVER_ENABLE)
def disable_fifo_rollover(self):
# FIFO rollover: disable to disallow FIFO tro wrap/roll over
self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_ROLLOVER_MASK, MAX30102_ROLLOVER_DISABLE)
def set_fifo_almost_full(self, number_of_samples):
# Set number of samples to trigger the almost full interrupt (page 18)
# Power on default is 32 samples. Note it is reverse: 0x00 is
# 32 samples, 0x0F is 17 samples
self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_A_FULL_MASK, number_of_samples)
def get_write_pointer(self):
# Read the FIFO Write Pointer from the register
wp = self.i2c_read_register(MAX30102_FIFO_WRITE_PTR)
return wp
def get_read_pointer(self):
# Read the FIFO Read Pointer from the register
wp = self.i2c_read_register(MAX30102_FIFO_READ_PTR)
return wp
# Die Temperature method: returns the temperature in C
def read_temperature(self):
# DIE_TEMP_RDY interrupt must be enabled
# Config die temperature register to take 1 temperature sample
self.i2c_set_register(MAX30102_DIE_TEMP_CONFIG, 0x01)
# Poll for bit to clear, reading is then complete
reading = ord(self.i2c_read_register(MAX30102_INT_STAT_2))
sleep_ms(100)
while (reading & MAX30102_INT_DIE_TEMP_RDY_ENABLE) > 0:
reading = ord(self.i2c_read_register(MAX30102_INT_STAT_2))
sleep_ms(1)
# Read die temperature register (integer)
tempInt = ord(self.i2c_read_register(MAX30102_DIE_TEMP_INT))
# Causes the clearing of the DIE_TEMP_RDY interrupt
tempFrac = ord(self.i2c_read_register(MAX30102_DIE_TEMP_FRAC))
# Calculate temperature (datasheet pg. 23)
return float(tempInt) + (float(tempFrac) * 0.0625)
def set_prox_int_tresh(self, val):
# Set the PROX_INT_THRESH (see proximity function on datasheet, pag 10)
self.i2c_set_register(MAX30102_PROX_INT_THRESH, val)
# DeviceID and Revision methods
def read_part_id(self):
# Load the Device ID from the register
part_id = self.i2c_read_register(MAX30102_PART_ID)
return part_id
def check_part_id(self):
# Checks the correctness of the Device ID
part_id = ord(self.read_part_id())
return part_id == MAX30102_EXPECTED_PART_ID
def get_revision_id(self):
# Load the Revision ID from the register
rev_id = self.i2c_read_register(MAX30102_REVISION_ID)
return ord(rev_id)
# Time slots management for multi-LED operation mode
def enable_slot(self, slot_number, device):
# In multi-LED mode, each sample is split into up to four time slots,
# SLOT1 through SLOT4. These control registers determine which LED is
# active in each time slot. (datasheet pag 22)
# Devices are SLOT_RED_LED or SLOT_RED_PILOT (proximity)
# Assigning a SLOT_RED_LED will pulse LED
# Assigning a SLOT_RED_PILOT will detect the proximity
if slot_number == 1:
self.bitmask(MAX30102_MULTI_LED_CONFIG_1, MAX30102_SLOT1_MASK, device)
elif slot_number == 2:
self.bitmask(MAX30102_MULTI_LED_CONFIG_1, MAX30102_SLOT2_MASK, device << 4)
elif slot_number == 3:
self.bitmask(MAX30102_MULTI_LED_CONFIG_2, MAX30102_SLOT3_MASK, device)
elif slot_number == 4:
self.bitmask(MAX30102_MULTI_LED_CONFIG_2, MAX30102_SLOT4_MASK, device << 4)
else:
raise ValueError('Wrong slot number:{0}!'.format(slot_number))
def disable_slots(self):
# Clear all the slots assignments
self.i2c_set_register(MAX30102_MULTI_LED_CONFIG_1, 0)
self.i2c_set_register(MAX30102_MULTI_LED_CONFIG_2, 0)
# Low-level I2C Communication
def i2c_read_register(self, REGISTER, n_bytes=1):
self._i2c.writeto(self._address, bytearray([REGISTER]))
return self._i2c.readfrom(self._address, n_bytes)
def i2c_set_register(self, REGISTER, VALUE):
self._i2c.writeto(self._address, bytearray([REGISTER, VALUE]))
return
# Given a register, read it, mask it, and then set the thing
def set_bitmask(self, REGISTER, MASK, NEW_VALUES):
newCONTENTS = (ord(self.i2c_read_register(REGISTER)) & MASK) | NEW_VALUES
self.i2c_set_register(REGISTER, newCONTENTS)
return
# Given a register, read it and mask it
def bitmask(self, reg, slotMask, thing):
originalContents = ord(self.i2c_read_register(reg))
originalContents = originalContents & slotMask
self.i2c_set_register(reg, originalContents | thing)
def fifo_bytes_to_int(self, fifo_bytes):
value = unpack(">i", b'\x00' + fifo_bytes)
return (value[0] & 0x3FFFF) >> self._pulse_width
# Returns how many samples are available
def available(self):
number_of_samples = len(self.sense.ir)
return number_of_samples
# Get a new IR value
def get_ir(self):
# Check the sensor for new data for 250ms
if self.safe_check(250):
return self.sense.ir.pop_head()
else:
# Sensor failed to find new data
return 0
# Get a new red value
def get_red(self):
# Check the sensor for new data for 250ms
if self.safe_check(250):
return self.sense.red.pop_head()
else:
# Sensor failed to find new data
return 0
# Note: the following 3 functions are the equivalent of using 'getFIFO'
# methods of the SparkFun library
# Pops the next IR value in storage (if available)
def pop_ir_from_storage(self):
if len(self.sense.ir) == 0:
return 0
else:
return self.sense.ir.pop()
# Pops the next red value in storage (if available)
def pop_red_from_storage(self):
if len(self.sense.red) == 0:
return 0
else:
return self.sense.red.pop()
# (useless - for comparison purposes only)
def next_sample(self):
if self.available():
# With respect to the SparkFun library, using a deque object
# allows us to avoid manually advancing of the tail
return True
# Polls the sensor for new data
def check(self):
# Call continuously to poll the sensor for new data.
read_pointer = ord(self.get_read_pointer())
write_pointer = ord(self.get_write_pointer())
# Do we have new data?
if read_pointer != write_pointer:
# Calculate the number of readings we need to get from sensor
number_of_samples = write_pointer - read_pointer
# Wrap condition (return to the beginning of 32 samples)
if number_of_samples < 0:
number_of_samples += 32
for i in range(number_of_samples):
# Read a number of bytes equal to activeLEDs*3 (= 1 sample)
fifo_bytes = self.i2c_read_register(MAX30102_FIFO_DATA,
self._multi_led_read_mode)
# Convert the readings from bytes to integers, depending
# on the number of active LEDs
if self._active_leds > 0:
self.sense.ir.append(
self.fifo_bytes_to_int(fifo_bytes[0:3])
)
if self._active_leds > 1:
self.sense.red.append(
self.fifo_bytes_to_int(fifo_bytes[3:6])
)
return True
else:
return False
# Check for new data but give up after a certain amount of time
def safe_check(self, max_time_to_check):
mark_time = ticks_ms()
while True:
if ticks_diff(ticks_ms(), mark_time) > max_time_to_check:
# Timeout reached
return False
if self.check():
# new data found
return True
sleep_ms(1)
```
### pulse_oximeter.py
```python=
from max30102 import MAX30102
from utime import ticks_ms, ticks_diff
class IIR_filter(object):
def __init__(self, alpha):
self.old_value = 0
self.alpha = alpha
def step(self, value):
value = (self.old_value*self.alpha
+ value*(1 - self.alpha))
self.old_value = value
return value
class AC_extractor(object):
def __init__(self):
self.max_ac = 0
self.min_ac = 0
self.ac = 0
self.cycle_time_mark = ticks_ms()
self.get_time_mark = ticks_ms()
self.is_down_period = False
def update(self, value_nodc):
if value_nodc > 0:
if self.max_ac != 0 and self.min_ac != 0:
self.is_down_period = False
time_intval = ticks_diff(ticks_ms(), self.cycle_time_mark)
if 2000 > time_intval > 270:
self.ac = self.max_ac - self.min_ac
self.get_time_mark = ticks_ms()
self.max_ac = 0
self.min_ac = 0
self.cycle_time_mark = ticks_ms()
else:
if value_nodc > self.max_ac:
self.max_ac = value_nodc
elif value_nodc < 0 and self.max_ac != 0:
self.is_down_period = True
if value_nodc < self.min_ac:
self.min_ac = value_nodc
def reset_ac(self):
self.ac = 0
class HR_calculator(object):
def __init__(self, target_n_beats=5):
self.target_n_beats = target_n_beats
self.n_beats = 0
self.heart_rate = 0.0
self.tot_intval = 0
self.beat_time_mark = ticks_ms()
self.is_beating = False
def update(self, is_beating):
if self.is_beating == False and is_beating == True:
rr_intval = ticks_diff(ticks_ms(), self.beat_time_mark)
if 2000 > rr_intval > 270:
self.n_beats += 1
self.tot_intval += rr_intval
if self.n_beats == self.target_n_beats:
tot_intval = self.tot_intval/1000
self.heart_rate = self.target_n_beats/(tot_intval/60)
self.tot_intval = 0
self.n_beats = 0
else:
self.tot_intval = 0
self.n_beats = 0
self.beat_time_mark = ticks_ms()
self.is_beating = is_beating
def get_heart_rate(self):
return self.heart_rate
class Pulse_oximeter(object):
def __init__(self, sensor):
sensor.set_led_mode(2)
self.sensor = sensor
self.raw_ir = 0
self.raw_red = 0
self.spo2 = 0
self.heart_rate = 0
self.is_beating = False
self.is_available = False
self.ac_extractor_ir = AC_extractor()
self.ac_extractor_red = AC_extractor()
self.dc_remover_ir = IIR_filter(0.99)
self.dc_remover_red = IIR_filter(0.99)
self.hr_calculator = HR_calculator()
def update(self):
self.spo2 = 0
self.sensor.check()
if (self.sensor.available()):
self.is_available = True
self.raw_ir = self.sensor.pop_ir_from_storage()
self.raw_red = self.sensor.pop_red_from_storage()
ir_dc = self.dc_remover_ir.step(self.raw_ir)
red_dc = self.dc_remover_red.step(self.raw_red)
ir_nodc = self.raw_ir - ir_dc
red_nodc = self.raw_red - red_dc
self.ac_extractor_ir.update(ir_nodc)
self.ac_extractor_red.update(red_nodc)
ir_ac = self.ac_extractor_ir.ac
red_ac = self.ac_extractor_red.ac
time_mark_ir = self.ac_extractor_ir.get_time_mark
time_mark_red = self.ac_extractor_red.get_time_mark
self.is_beating = self.ac_extractor_red.is_down_period
self.hr_calculator.update(self.is_beating)
self.heart_rate = self.hr_calculator.get_heart_rate()
ir_red_intval = abs(ticks_diff(time_mark_ir, time_mark_red))
if ir_ac > 0 and red_ac > 0:
if ir_red_intval < 100:
ratio = (red_ac/red_dc)/(ir_ac/ir_dc)
self.spo2 = -45.060*ratio**2 + 30.354*ratio + 94.845
self.ac_extractor_ir.reset_ac()
self.ac_extractor_red.reset_ac()
else:
self.is_available = False
def available(self):
return self.is_available
def get_spo2(self):
return self.spo2
def get_raw_ir(self):
return self.raw_ir
def get_raw_red(self):
return self.raw_red
def get_heart_rate(self):
return self.heart_rate
```