# 慈濟大學醫學資訊學系 智慧醫療工作坊 (2023/12/22) ## 智慧物聯網(AIoT)與智慧醫療應用(快速指令表) 講師:歐尼克斯實境互動工作室 許哲豪(Jack Hsu)博士    2023/12/22 整理製作 ![FM636A](https://hackmd.io/_uploads/SybtS0JPT.jpg =240x) 課程使用教具:[【旗標─創客‧自造者工作坊─Python×AI 生醫感測健康大應用】](https://www.flag.com.tw/maker/FM636A)(內附教材一本、相關開發板及感測器)、[【相關範例程式】](https://www.flag.com.tw/bk/t/FM636A) 月 **註:本次課程參考原教材第一至六章改編,欲更深入了解的學員可參考原教材及相關範例程式。** [toc] ## 1. 智慧物聯網與生醫感測器簡介 ## 2. 認識微控制器ESP32與開發環境建置 1. 下載 Python 程式開發工具 Thonny https://thonny.org/ 2. 下載USB轉UART連線驅動程式 CH341SER http://www.wch.cn/downloads/CH341SER_EXE.html 3. 將USB纜線小頭插入開發板,再將大頭插入電腦USB埠,開啟Thonny,點選主選單「執行」-「工具/設定直譯器」,切到「直譯器」子頁面,Thonny直譯器選擇「MicroPython(一般)」,連接埠選擇「USB-Serial CH340(COMx)」,x會隨機產生。 4. 看到Thonny下方「互動環境(Shell)」出現「MicroPython」字樣則完成連線設定。 5. 設定WiFi遠端監看,將開發板變成網頁伺服器,透過瀏覽器監看數值。(下列方式二選一) * 開發板和筆電都連到同一個區域網路的無線Wi-Fi基地台,再於瀏覽器輸入開發板動態配置到的虛擬網址即可查看資訊。(此方式不適用於學校的公用無線網路) * 開放個人安卓手機變成無線Wi-Fi基地台,關閉「數據節省模式」,設定「網路和網際網路」-「無線基地台與網路共用」 ,點擊「Wi-Fi無線基地台」,設定「無線基地台名稱」、安全性「WPA2 PSK」及「密碼」,再開啟「無線基地台」,完成後令開發板連線至手機無線Wi-Fi基地台,取得虛擬網址,再以手機的瀏覽器連線即可。若欲使用筆電上的瀏覽器連線,則筆電亦需連線到手機的無線Wi-Fi基地台才能瀏覽。 ### 2.1 LED閃爍測試(單行程式) 在互動環境中依序一次一列輸入下列指令即可改變LED亮滅 ```python= from machine import Pin # 引入接腳設定函式庫 led = Pin(5, Pin.OUT) # 設定Pin5為輸出腳 led.value(0) # 輸出低電位,點亮LED led.value(False) # 點亮LED,功能同上一指令 led.value(1) # 輸出高電位,熄滅LED led.value(True) # 熄滅LED,功能同上一指令 ``` ### 2.2 LED閃爍測試(完整程式 \CH02\LAB01.py) 1. 在Thonny編輯區中輸入下列程式,並按工作列「綠色箭頭」按鈕執行程式。 2. 若欲保存程式,可按工作列「磁碟」按鈕,進行儲存。儲存時可選擇本機或MicroPython設備。**儲存前請記得先按「紅色方塊」停止程式執行。** * 本機:可存放在任意目錄,可任意命名為 xxx.py。 * MicroPython:儲存在開發板中,可任意命名為 xxx.py。若命名為 main.py ,則開發板重置後會自動執行該檔案。 ```python= # 從 machine 模組匯入 Pin 物件 from machine import Pin # 匯入時間相關的time模組 import time #建立 5 號腳位的 Pin 物件, 設定為腳位輸出, 命名為 led led = Pin(5, Pin.OUT) while True: led.value(1) # 熄滅LED燈 time.sleep(0.5) # 暫停0.5秒 led.value(0) # 點亮LED燈 time.sleep(0.5) # 暫停0.5秒 ``` **練習一:可改變不同閃爍時間及次數,如作出三長三短的SOS求救信號。** ## 3. 膚電反應(GSR)量測實驗 ### 3.1 ADC讀值測試(單行程式) 在互動環境中依序一次一列輸入下列指令即可讀取ADC數值(外部電壓) ```python= # 從 machine 模組匯入 Pin, ADC 物件 from machine import Pin, ADC adc_pin = Pin(36) # 36是ESP32的VP腳位 adc = ADC(adc_pin) # 設定36為輸入腳位 adc.width(ADC.WIDTH_9BIT) # 設定分辨率位元數(解析度) adc.atten(ADC.ATTN_11DB) # 設定最大電壓 adc.read() # 讀取ADC數值 ``` ### 3.2 基礎膚電測試(完整程式)(\CH03\LAB02.py) 每隔0.1秒讀取一次ADC數值(外部電壓)並顯示。 ```python= import time from machine import Pin, ADC adc_pin = Pin(36) # 36是ESP32的VP腳位 adc = ADC(adc_pin) # 設定36為輸入腳位 adc.width(ADC.WIDTH_12BIT) # 設定分辨率位元數(解析度) adc.atten(ADC.ATTN_11DB) # 設定最大電壓 while True: # 設定無限迴圈,反覆讀值 gsr = adc.read() print(gsr) time.sleep(0.1) ``` ### 3.3 無線介面測謊器測試(完整程式)(\CH03\LAB03.py) 程式執行前請先上傳下列模組程式到MicroPython設備中。 * ESPWebServer.py 網頁伺服器模組 * index.html 監看數值網頁(\CH03\上傳資料) 修改程式中無線基地台名稱SSID及密碼password sta.connect("SSID", "password") 根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時數值變化。 ```python= # 匯入 utime 模組用以計時 from utime import ticks_ms, ticks_diff from machine import Pin, ADC import network, ESPWebServer adc_pin = Pin(36) # 36是ESP32的VP腳位 adc = ADC(adc_pin) # 設定36為輸入腳位 adc.width(ADC.WIDTH_9BIT) # 設定分辨率位元數(解析度) adc.atten(ADC.ATTN_11DB) # 設定最大電壓 angle = 180 # 膚電反應轉換後角度 def SendAngle(socket, args): # 處理 /lie 指令的函式 ESPWebServer.ok(socket, "200", str(angle)) # 將膚電反應對應到180~360的函式 def gsr_to_angle(raw_val, min_val, max_val): raw_val *= -1 new_val = ((raw_val + max_val) /(max_val - min_val)*(360 - 180) + 180) return new_val print("連接中...") sta = network.WLAN(network.STA_IF) sta.active(True) # 自行變更無線基地台名稱SSID及密碼password sta.connect("SSID", "password") while not sta.isconnected(): pass print("已連接, ip為:", sta.ifconfig()[0]) ESPWebServer.begin(80) # 啟用網站 ESPWebServer.onPath("/lie", SendAngle) # 指定處理指令的函式 time_mark = ticks_ms() # 取得當前時間 while True: # 持續檢查是否收到新指令 ESPWebServer.handleClient() # 當計時器變數與現在的時間差小於 100 則執行任務 if ticks_diff(ticks_ms(), time_mark) > 100: gsr = adc.read() angle = gsr_to_angle(gsr, 400, 511) time_mark = ticks_ms() # 重置計時器 # 讀取ADC數值 ``` ## 4. 血氧濃度(SpO2)量測實驗 ### 4.1 血氧濃度讀取(直接顯示)(\CH04\LAB04.py) 食指先置於感測器上再按快捷列「執行」鈕,待15秒後開始會有讀值顯示在互動區,當無讀值時不顯示。可嚐試吐氣後停止呼吸一段時間來觀察差異。 ```python= from machine import SoftI2C, Pin from max30102 import MAX30102 from pulse_oximeter import Pulse_oximeter my_SCL_pin = 25 # I2C SCL 腳位 my_SDA_pin = 26 # I2C SDA 腳位 i2c = SoftI2C(sda=Pin(my_SDA_pin), scl=Pin(my_SCL_pin)) sensor = MAX30102(i2c=i2c) sensor.setup_sensor() pox = Pulse_oximeter(sensor) # 使用血氧濃度計算類別 count = 0 while True: pox.update() spo2 = pox.get_spo2() count = count+1 if spo2 > 0: print("SpO2:", spo2, "%") # else: # print(count) ``` ### 4.2 血氧濃度讀取(網頁顯示)(\CH04\LAB05.py) 程式執行前請先上傳下列模組程式到MicroPython設備中。 * ESPWebServer.py 網頁伺服器模組 * max30102.py 血氧感測模組 * pulse_oximeter.py 血氧計算函式 * circular_buffer.py 血氧運算工具 * index.html 監看數值網頁(\CH04\上傳資料) 修改程式中無線基地台名稱SSID及密碼password sta.connect("SSID", "password") 根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時數值變化。 ```python= # 匯入 utime 模組用以計時 from utime import ticks_ms, ticks_diff from machine import SoftI2C, Pin import network, ESPWebServer from max30102 import MAX30102 from pulse_oximeter import Pulse_oximeter my_SCL_pin = 25 # I2C SCL 腳位 my_SDA_pin = 26 # I2C SDA 腳位 i2c = SoftI2C(sda=Pin(my_SDA_pin), scl=Pin(my_SCL_pin)) sensor = MAX30102(i2c=i2c) sensor.setup_sensor() pox = Pulse_oximeter(sensor) # 使用血氧濃度計算類別 spo2 = 0 def SendSpo2(socket, args): # 處理 /handleCmd 指令的函式 ESPWebServer.ok(socket, "200", str(spo2)) print("連接中...") sta = network.WLAN(network.STA_IF) sta.active(True) sta.connect("SSID", "password") while not sta.isconnected(): pass print("已連接, ip為:", sta.ifconfig()[0]) ESPWebServer.begin(80) # 啟用網站 ESPWebServer.onPath("/measure", SendSpo2) time_mark = ticks_ms() while True: ESPWebServer.handleClient() pox.update() spo2_tmp = pox.get_spo2() spo2_tmp = round(spo2_tmp, 1) if spo2_tmp > 0: time_mark = ticks_ms() spo2 = spo2_tmp print("SpO2:", spo2, "%") if ticks_diff(ticks_ms(), time_mark) > 5000: spo2 = 0 ``` ## 5. 脈博心律(PPG)量測實驗 ### 5.1 心率讀取(直接顯示)(\CH05\LAB06.py) 食指先置於感測器上再按快捷列「執行」鈕,待3~5秒後濾波器取得穩定直流值後,互動區開始會有讀值顯示,當無讀值時不顯示。 ```python= from machine import SoftI2C, Pin from max30102 import MAX30102 from pulse_oximeter import Pulse_oximeter, IIR_filter my_SCL_pin = 25 # I2C SCL 腳位 my_SDA_pin = 26 # I2C SDA 腳位 i2c = SoftI2C(sda=Pin(my_SDA_pin), scl=Pin(my_SCL_pin)) sensor = MAX30102(i2c=i2c) sensor.setup_sensor() pox = Pulse_oximeter(sensor) dc_extractor = IIR_filter(0.99) # 用於提取直流成份 while True: pox.update() # 更新血氧模組 if pox.available(): red_val = pox.get_raw_red() red_dc = dc_extractor.step(red_val) ppg = int(red_dc*1.01 - red_val) print(ppg) ``` ### 5.2 心率讀取(網頁顯示)(\CH05\LAB07.py) 程式執行前請先上傳下列模組程式到MicroPython設備中。此步驟和4.2範例相同,若已執行過則前4個檔案可略過,第5步驟仍需執行。 * ESPWebServer.py 網頁伺服器模組 * max30102.py 血氧感測模組 * pulse_oximeter.py 血氧計算函式 * circular_buffer.py 血氧運算工具 * index.html 監看數值網頁(\CH05\上傳資料) 修改程式中無線基地台名稱SSID及密碼password sta.connect("SSID", "password") 根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時波形(數值)變化。 ```python= import _thread from utime import ticks_ms, ticks_diff from machine import SoftI2C, Pin import network, ESPWebServer from max30102 import MAX30102 from pulse_oximeter import Pulse_oximeter, IIR_filter led = Pin(5, Pin.OUT) led.value(1) my_SCL_pin = 25 # I2C SCL 腳位 my_SDA_pin = 26 # I2C SDA 腳位 i2c = SoftI2C(sda=Pin(my_SDA_pin), scl=Pin(my_SCL_pin)) sensor = MAX30102(i2c=i2c) sensor.setup_sensor() pox = Pulse_oximeter(sensor) dc_extractor = IIR_filter(0.99) # 用於提取直流成分 thresh_generator = IIR_filter(0.9) # 用於產生動態閾值 is_beating = False # 紀錄是否正在跳動的旗標 beat_time_mark = ticks_ms() # 紀錄心跳時間點 heart_rate = 0 num_beats = 0 # 紀錄心跳次數 target_n_beats = 3 # 設定要幾次心跳才更新一次心率 tot_intval = 0 # 紀錄心跳時間區間 ppg = 0 def cal_heart_rate(intval, target_n_beats=3): intval /= 1000 heart_rate = target_n_beats/(intval/60) heart_rate = round(heart_rate, 1) return heart_rate def SendHrRate(socket, args): # 處理 /hr 指令的函式 ESPWebServer.ok(socket, "200", str(heart_rate)) def SendEcg(socket, args): # 處理 /line 指令的函式 ESPWebServer.ok(socket, "200", str(ppg)) def web_thread(): # 處理網頁的子執行緒函式 while True: ESPWebServer.handleClient() print("連接中...") sta = network.WLAN(network.STA_IF) sta.active(True) sta.connect("SSID", "password") while not sta.isconnected(): pass print("已連接, ip為:", sta.ifconfig()[0]) ESPWebServer.begin(80) # 啟用網站 ESPWebServer.onPath("/hr", SendHrRate) # 指定處理指令的函式 ESPWebServer.onPath("/line", SendEcg) # 指定處理指令的函式 _thread.start_new_thread(web_thread, ()) # 啟動子執行緒 while True: # 主執行緒 pox.update() # 更新血氧模組 if pox.available(): red_val = pox.get_raw_red() red_dc = dc_extractor.step(red_val) ppg = max(int(red_dc*1.01 - red_val), 0) thresh = thresh_generator.step(ppg) if ppg > (thresh + 20) and not is_beating: is_beating = True led.value(0) intval = ticks_diff(ticks_ms(), beat_time_mark) if 2000 > intval > 270: tot_intval += intval num_beats += 1 if num_beats == target_n_beats: heart_rate = cal_heart_rate( tot_intval, target_n_beats) print(heart_rate) tot_intval = 0 num_beats = 0 else: tot_intval = 0 num_beats = 0 beat_time_mark = ticks_ms() elif ppg < thresh: is_beating = False led.value(1) ``` ## 6. 心電訊號(ECG)量測實驗 ### 6.1 心電訊號讀取(直接顯示)(\CH06\LAB08.py) 左手貼黃色電極片,右手貼紅色,左腳貼綠色,每50ms送出讀值最大內容,互動區當無讀值時不顯示。 ```python= from utime import ticks_ms, ticks_diff from machine import Pin, ADC adc_pin = Pin(36) # 36是ESP32的VP腳位 adc = ADC(adc_pin) # 設定36為輸入腳位 adc.width(ADC.WIDTH_10BIT) # 設定分辨率位元數(解析度) adc.atten(ADC.ATTN_11DB) # 設定最大電壓 max_val = 0 # 用來紀錄最大值 time_mark = ticks_ms() # 取得當前時間 while True: raw_val = adc.read() if raw_val > max_val: max_val = raw_val if ticks_diff(ticks_ms(), time_mark) > 50: ecg = max_val print(ecg) max_val = 0 time_mark = ticks_ms() # 重置計時器 ``` ### 6.2 心電訊號讀取(網頁顯示)(\CH06\LAB09.py) 程式執行前請先上傳下列模組程式到MicroPython設備中。此步驟和4.2範例相同,若已執行過則前4個檔案可略過,第5步驟仍需執行。 * ESPWebServer.py 網頁伺服器模組 * max30102.py 血氧感測模組 * pulse_oximeter.py 血氧計算函式 * circular_buffer.py 血氧運算工具 * index.html 監看數值網頁(\CH06\上傳資料) 修改程式中無線基地台名稱SSID及密碼password sta.connect("SSID", "password") 根據互動區顯示的「已連接, ip為: xxx.xxx.xxx.xxx」,使用手機或電腦瀏覽器(Edge, Firefox, Chrome...)開啟網頁,即可看到即時波形(數值)變化。 ```python= import _thread from utime import ticks_ms, ticks_diff from machine import Pin, ADC import network, ESPWebServer from pulse_oximeter import IIR_filter buzzer = Pin(2, Pin.OUT) adc_pin = Pin(36) # 36是ESP32的VP腳位 adc = ADC(adc_pin) # 設定36為輸入腳位 adc.width(ADC.WIDTH_10BIT) # 設定分辨率位元數(解析度) adc.atten(ADC.ATTN_11DB) # 設定最大電壓 thresh_generator = IIR_filter(0.9) # 用於產生動態閾值 is_beating = False # 紀錄是否正在跳動的旗標 beat_time_mark = ticks_ms() # 紀錄心跳時間點 heart_rate = 0 num_beats = 0 # 紀錄心跳次數 target_n_beats = 3 # 設定要幾次心跳才更新一次心率 tot_intval = 0 # 紀錄心跳時間區間 max_val = 0 ecg = 0 def cal_heart_rate(intval, target_n_beats=3): intval /= 1000 heart_rate = target_n_beats/(intval/60) heart_rate = round(heart_rate, 1) return heart_rate def SendHrRate(socket, args): # 處理 /hr 指令的函式 ESPWebServer.ok(socket, "200", str(heart_rate)) def SendEcg(socket, args): # 處理 /line 指令的函式 ESPWebServer.ok(socket, "200", str(ecg)) def web_thread(): while True: ESPWebServer.handleClient() print("連接中...") sta = network.WLAN(network.STA_IF) sta.active(True) sta.connect("SSID", "password") while not sta.isconnected(): pass print("已連接, ip為:", sta.ifconfig()[0]) ESPWebServer.begin(80) # 啟用網站 ESPWebServer.onPath("/hr", SendHrRate) # 指定處理指令的函式 ESPWebServer.onPath("/line", SendEcg) # 指定處理指令的函式 _thread.start_new_thread(web_thread, ()) time_mark = ticks_ms() while True: raw_val = adc.read() if raw_val > max_val: max_val = raw_val if ticks_diff(ticks_ms(), time_mark) > 50: ecg = max_val thresh = thresh_generator.step(ecg) if ecg > (thresh + 100) and not is_beating: is_beating = True buzzer.value(1) intval = ticks_diff(ticks_ms(), beat_time_mark) if 2000 > intval > 270: tot_intval += intval num_beats += 1 if num_beats == target_n_beats: heart_rate = cal_heart_rate( tot_intval, target_n_beats) print(heart_rate) tot_intval = 0 num_beats = 0 else: tot_intval = 0 num_beats = 0 beat_time_mark = ticks_ms() elif ecg < thresh: is_beating = False buzzer.value(0) max_val = 0 time_mark = ticks_ms() ``` ## 附錄:外部模組 ### circular_buffer.py ```python= from ucollections import deque class CircularBuffer(object): ''' Very simple implementation of a circular buffer based on deque ''' def __init__(self, max_size): self.data = deque((), max_size, True) self.max_size = max_size def __len__(self): return len(self.data) def is_empty(self): return not bool(self.data) def append(self, item): try: self.data.append(item) except IndexError: # deque full, popping 1st item out self.data.popleft() self.data.append(item) def pop(self): return self.data.popleft() def clear(self): self.data = deque((), self.max_size, True) def pop_head(self): buffer_size = len(self.data) temp = self.data if buffer_size == 1: pass elif buffer_size > 1: self.data.clear() for x in range(buffer_size - 1): self.data = temp.popleft() else: return 0 return temp.popleft() ``` ### ESPWebServer.py ```python= """A simple HTTP server that only accept GET request It adopt the programming style of ESP8266WebServer library in ESP8266 Arduino Core """ import network import machine import socket import uselect import os server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Use for checking a new client connection poller = uselect.poll() # Dict for registed handlers of all paths handlers = {} # Function of handler for request not found notFoundHandler = None # The path to the web documents on MicroPython filesystem docPath = "/" # Data for template tplData = {} def begin(port): """Function to start http server """ global server, poller server.bind(('0.0.0.0', port)) server.listen(1) # Register for checking new client connection poller.register(server, uselect.POLLIN) def close(): """Function to stop http server """ poller.unregister(server) server.close() def handleClient(): """Check for new client connection and process the request """ global server, poller # Note:don't call poll() with 0, that would randomly cause # reset with "Fatal exception 28(LoadProhibitedCause)" message res = poller.poll(1) if res: # There's a new client connection (socket, sockaddr) = server.accept() handle(socket) socket.close() def __sendPage(socket, filePath): """Send the file as webpage to client """ try: f = open(filePath, "rb") while True: data = f.read(64) if (data == b""): break socket.write(data) f.close() except Exception as e: print(e) def err(socket, code, message): """Respong error meesage to client """ socket.write("HTTP/1.1 " + code + " " + message + "\r\n\r\n") socket.write("<h1>" + message + "</h1>") def ok(socket, code, msg): """Response successful message or webpage to client """ socket.write("HTTP/1.1 " + code + " OK\r\n\r\n") if __fileExist(msg): filePath = msg __sendPage(socket, filePath) else: socket.write(msg) def __fileExist(path): """Check for file existence """ # print(path) try: stat = os.stat(path) # stat[0] bit 15 / 14 -> file/dir if stat[0] & 0x8000 == 0x8000: # file # print("Found.") return True else: # Dir return False except: # print("Not Found.") return False def handle(socket): """Processing new GET request """ global docPath, handlers currLine = str(socket.readline(), 'utf-8') request = currLine.split(" ") if len(request) != 3: # Discarded if it's a bad header return (method, url, version) = request if "?" in url: # Check if there's query string? (path, query) = url.split("?", 2) else: (path, query) = (url, "") args = {} if query: # Parsing the querying string argPairs = query.split("&") for argPair in argPairs: arg = argPair.split("=") args[arg[0]] = arg[1] while True: # Read until blank line after header header = socket.readline() if header == b"": return if header == b"\r\n": break # Check for supported HTTP version if version != "HTTP/1.0\r\n" and version != "HTTP/1.1\r\n": err(socket, "505", "Version Not Supported") elif method != "GET": # Only accept GET request err(socket, "501", "Not Implemented") elif path in handlers: # Check for registered path handlers[path](socket, args) #elif not path.startswith(docPath): # Check for path to any document # err(socket, "400", "Bad Request") else: filePath = path # find the file if not __fileExist(filePath): filePath = path + ("index.html" if path.endswith("/") else "/index.html") # find index.html in the path if not __fileExist(filePath): filePath = path + ("index.p.html" if path.endswith("/") else "/index.p.html") # find index.p.html in the path if not __fileExist(filePath): if notFoundHandler: notFoundHandler(socket) else: err(socket, "404", "Not Found") return # Responds the header first socket.write("HTTP/1.1 200 OK\r\n\r\n") # Responds the file content if filePath.endswith(".p.html"): # print("template file.") f = open(filePath, "r") for l in f: socket.write(l.format(**tplData)) f.close() else: __sendPage(socket, filePath) def onPath(path, handler): """Register handler for processing request of specified path """ global handlers handlers[path] = handler def onNotFound(handler): """Register handler for processing request of not found path """ global notFoundHandler notFoundHandler = handler def setDocPath(path): """Set the path to documents' directory """ global docPath docPath = path def setTplData(data): """Set data for template """ global tplData tplData = data ``` ### max30102.py ```python= # This work is a lot based on: # - https://github.com/sparkfun/SparkFun_MAX3010x_Sensor_Library # Written by Peter Jansen and Nathan Seidle (SparkFun) # This is a library written for the Maxim MAX30102 Optical Smoke Detector # It should also work with the MAX30102, which has a Green LED, too. # These sensors use I2C to communicate, as well as a single (optional) # interrupt line that is not currently supported in this driver. # Written by Peter Jansen and Nathan Seidle (SparkFun) # BSD license, all text above must be included in any redistribution. # # - https://github.com/kandizzy/esp32-micropython/blob/master/PPG/ppg/MAX30102.py # A port of the library to MicroPython by kandizzy # # This driver aims at giving almost full access to Maxim MAX30102 functionalities. # n-elia import uerrno from machine import SoftI2C from ustruct import unpack from utime import sleep_ms, ticks_diff, ticks_ms from circular_buffer import CircularBuffer # I2C address (7-bit address) MAX3010X_I2C_ADDRESS = 0x57 # Right-shift of 0xAE, 0xAF # Status Registers MAX30102_INT_STAT_1 = 0x00 MAX30102_INT_STAT_2 = 0x01 MAX30102_INT_ENABLE_1 = 0x02 MAX30102_INT_ENABLE_2 = 0x03 # FIFO Registers MAX30102_FIFO_WRITE_PTR = 0x04 MAX30102_FIFO_OVERFLOW = 0x05 MAX30102_FIFO_READ_PTR = 0x06 MAX30102_FIFO_DATA = 0x07 # Configuration Registers MAX30102_FIFO_CONFIG = 0x08 MAX30102_MODE_CONFIG = 0x09 MAX30102_PARTICLE_CONFIG = 0x0A # Sometimes listed as 'SPO2' in datasheet (pag.11) MAX30102_LED1_PULSE_AMP = 0x0C # IR MAX30102_LED2_PULSE_AMP = 0x0D # RED MAX30102_LED_PROX_AMP = 0x10 MAX30102_MULTI_LED_CONFIG_1 = 0x11 MAX30102_MULTI_LED_CONFIG_2 = 0x12 # Die Temperature Registers MAX30102_DIE_TEMP_INT = 0x1F MAX30102_DIE_TEMP_FRAC = 0x20 MAX30102_DIE_TEMP_CONFIG = 0x21 # Proximity Function Registers MAX30102_PROX_INT_THRESH = 0x30 # Part ID Registers MAX30102_REVISION_ID = 0xFE MAX30102_PART_ID = 0xFF # Should always be 0x15. Identical for MAX30102. # MAX30102 Commands # Interrupt configuration (datasheet pag 13, 14) MAX30102_INT_A_FULL_MASK = ~0b10000000 MAX30102_INT_A_FULL_ENABLE = 0x80 MAX30102_INT_A_FULL_DISABLE = 0x00 MAX30102_INT_DATA_RDY_MASK = ~0b01000000 MAX30102_INT_DATA_RDY_ENABLE = 0x40 MAX30102_INT_DATA_RDY_DISABLE = 0x00 MAX30102_INT_ALC_OVF_MASK = ~0b00100000 MAX30102_INT_ALC_OVF_ENABLE = 0x20 MAX30102_INT_ALC_OVF_DISABLE = 0x00 MAX30102_INT_PROX_INT_MASK = ~0b00010000 MAX30102_INT_PROX_INT_ENABLE = 0x10 MAX30102_INT_PROX_INT_DISABLE = 0x00 MAX30102_INT_DIE_TEMP_RDY_MASK = ~0b00000010 MAX30102_INT_DIE_TEMP_RDY_ENABLE = 0x02 MAX30102_INT_DIE_TEMP_RDY_DISABLE = 0x00 # FIFO data queue configuration MAX30102_SAMPLE_AVG_MASK = ~0b11100000 MAX30102_SAMPLE_AVG_1 = 0x00 MAX30102_SAMPLE_AVG_2 = 0x20 MAX30102_SAMPLE_AVG_4 = 0x40 MAX30102_SAMPLE_AVG_8 = 0x60 MAX30102_SAMPLE_AVG_16 = 0x80 MAX30102_SAMPLE_AVG_32 = 0xA0 MAX30102_ROLLOVER_MASK = 0xEF MAX30102_ROLLOVER_ENABLE = 0x10 MAX30102_ROLLOVER_DISABLE = 0x00 # Mask for 'almost full' interrupt (defaults to 32 samples) MAX30102_A_FULL_MASK = 0xF0 # Mode configuration commands (page 19) MAX30102_SHUTDOWN_MASK = 0x7F MAX30102_SHUTDOWN = 0x80 MAX30102_WAKEUP = 0x00 MAX30102_RESET_MASK = 0xBF MAX30102_RESET = 0x40 MAX30102_MODE_MASK = 0xF8 MAX30102_MODE_IR_ONLY = 0x02 MAX30102_MODE_IR_RED = 0x03 MAX30102_MODE_MULTI_LED = 0x07 # Particle sensing configuration commands (pgs 19-20) MAX30102_ADC_RANGE_MASK = 0x9F MAX30102_ADC_RANGE_2048 = 0x00 MAX30102_ADC_RANGE_4096 = 0x20 MAX30102_ADC_RANGE_8192 = 0x40 MAX30102_ADC_RANGE_16384 = 0x60 MAX30102_SAMPLERATE_MASK = 0xE3 MAX30102_SAMPLERATE_50 = 0x00 MAX30102_SAMPLERATE_100 = 0x04 MAX30102_SAMPLERATE_200 = 0x08 MAX30102_SAMPLERATE_400 = 0x0C MAX30102_SAMPLERATE_800 = 0x10 MAX30102_SAMPLERATE_1000 = 0x14 MAX30102_SAMPLERATE_1600 = 0x18 MAX30102_SAMPLERATE_3200 = 0x1C MAX30102_PULSE_WIDTH_MASK = 0xFC MAX30102_PULSE_WIDTH_69 = 0x00 MAX30102_PULSE_WIDTH_118 = 0x01 MAX30102_PULSE_WIDTH_215 = 0x02 MAX30102_PULSE_WIDTH_411 = 0x03 # LED brightness level. It affects the distance of detection. MAX30102_PULSE_AMP_LOWEST = 0x02 # 0.4mA - Presence detection of ~4 inch MAX30102_PULSE_AMP_LOW = 0x1F # 6.4mA - Presence detection of ~8 inch MAX30102_PULSE_AMP_MEDIUM = 0x7F # 25.4mA - Presence detection of ~8 inch MAX30102_PULSE_AMP_HIGH = 0xFF # 50.0mA - Presence detection of ~12 inch # Multi-LED Mode configuration (datasheet pag 22) MAX30102_SLOT1_MASK = 0xF8 MAX30102_SLOT2_MASK = 0x8F MAX30102_SLOT3_MASK = 0xF8 MAX30102_SLOT4_MASK = 0x8F SLOT_NONE = 0x00 SLOT_IR_LED = 0x01 SLOT_RED_LED = 0x02 SLOT_NONE_PILOT = 0x04 SLOT_IR_PILOT = 0x05 SLOT_RED_PILOT = 0x06 MAX30102_EXPECTED_PART_ID = 0x15 TAG = 'MAX30102' # Size of the queued readings STORAGE_QUEUE_SIZE = 4 # Data structure to hold the last readings class SensorData: def __init__(self): self.ir = CircularBuffer(STORAGE_QUEUE_SIZE) self.red = CircularBuffer(STORAGE_QUEUE_SIZE) self.green = CircularBuffer(STORAGE_QUEUE_SIZE) # Sensor class class MAX30102(object): def __init__(self, i2c: SoftI2C, i2c_hex_address=MAX3010X_I2C_ADDRESS, ): self._address = i2c_hex_address self._i2c = i2c self._active_leds = None self._pulse_width = None self._multi_led_read_mode = None # Store current config values to compute acquisition frequency self._sample_rate = None self._sample_avg = None self._acq_frequency = None self._acq_frequency_inv = None # Circular buffer of readings from the sensor self.sense = SensorData() try: # self._i2c.readfrom(self._address, 1) # Some boards won't work if scan() is never called dev_list = self._i2c.scan() if not self._address in dev_list: raise OSError(uerrno.ENODEV) except OSError as error: if error.errno == uerrno.ENODEV: raise RuntimeError("Sensor not found on I2C bus.") else: raise RuntimeError("Error while reading from I2C bus: OSError code " + str(error.errno)) if not (self.check_part_id()): raise RuntimeError("I2C device ID not corresponding to MAX30102 or MAX30102") # Sensor setup method def setup_sensor(self, led_mode=2, adc_range=16384, sample_rate=400, led_power=MAX30102_PULSE_AMP_MEDIUM, sample_avg=8, pulse_width=411): # Reset the sensor's registers from previous configurations self.soft_reset() # Set the number of samples to be averaged by the chip to 8 self.set_fifo_average(sample_avg) # Allow FIFO queues to wrap/roll over self.enable_fifo_rollover() # Set the LED mode to the default value of 2 (IR + RED) # Note: the 3rd mode is available only with MAX30102 self.set_led_mode(led_mode) # Set the ADC range to default value of 16384 self.set_adc_range(adc_range) # Set the sample rate to the default value of 400 self.set_sample_rate(sample_rate) # Set the Pulse Width to the default value of 411 self.set_pulse_width(pulse_width) # Set the LED brightness to the default value of 'low' self.set_pulse_amplitude_ir(led_power) self.set_pulse_amplitude_red(led_power) self.set_pulse_amplitude_proximity(led_power) # Clears the FIFO self.clear_fifo() def __del__(self): self.shutdown() # Methods to read the two interrupt flags def get_int_1(self): # Load the Interrupt 1 status (configurable) from the register rev_id = self.i2c_read_register(MAX30102_INT_STAT_1) return rev_id def get_int_2(self): # Load the Interrupt 2 status (DIE_TEMP_DRY) from the register rev_id = self.i2c_read_register(MAX30102_INT_STAT_2) return rev_id # Methods to set up the interrupt flags def enable_a_full(self): # Enable the almost full interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_A_FULL_MASK, MAX30102_INT_A_FULL_ENABLE) def disable_a_full(self): # Disable the almost full interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_A_FULL_MASK, MAX30102_INT_A_FULL_DISABLE) def enable_data_rdy(self): # Enable the new FIFO data ready interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_DATA_RDY_MASK, MAX30102_INT_DATA_RDY_ENABLE) def disable_data_rdy(self): # Disable the new FIFO data ready interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_DATA_RDY_MASK, MAX30102_INT_DATA_RDY_DISABLE) def enable_alc_ovf(self): # Enable the ambient light limit interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_ALC_OVF_MASK, MAX30102_INT_ALC_OVF_ENABLE) def disable_alc_ovf(self): # Disable the ambient light limit interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_ALC_OVF_MASK, MAX30102_INT_ALC_OVF_DISABLE) def enable_prox_int(self): # Enable the proximity interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_PROX_INT_MASK, MAX30102_INT_PROX_INT_ENABLE) def disable_prox_int(self): # Disable the proximity interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_1, MAX30102_INT_PROX_INT_MASK, MAX30102_INT_PROX_INT_DISABLE) def enable_die_temp_rdy(self): # Enable the die temp. conversion finish interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_2, MAX30102_INT_DIE_TEMP_RDY_MASK, MAX30102_INT_DIE_TEMP_RDY_ENABLE) def disable_die_temp_rdy(self): # Disable the die temp. conversion finish interrupt (datasheet pag. 13) self.bitmask(MAX30102_INT_ENABLE_2, MAX30102_INT_DIE_TEMP_RDY_MASK, MAX30102_INT_DIE_TEMP_RDY_DISABLE) # Configuration reset def soft_reset(self): # When the RESET bit is set to one, all configuration, threshold, # and data registers are reset to their power-on-state through # a power-on reset. The RESET bit is cleared automatically back to zero # after the reset sequence is completed. (datasheet pag. 19) self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_RESET_MASK, MAX30102_RESET) curr_status = -1 while not ((curr_status & MAX30102_RESET) == 0): sleep_ms(10) curr_status = ord(self.i2c_read_register(MAX30102_MODE_CONFIG)) # Power states methods def shutdown(self): # Put IC into low power mode (datasheet pg. 19) # During shutdown the IC will continue to respond to I2C commands but # will not update with or take new readings (such as temperature). self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_SHUTDOWN_MASK, MAX30102_SHUTDOWN) def wakeup(self): # Pull IC out of low power mode (datasheet pg. 19) self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_SHUTDOWN_MASK, MAX30102_WAKEUP) # LED Configuration def set_led_mode(self, LED_mode): # Set LED mode: select which LEDs are used for sampling # Options: IR only, IR + RED (datasheet pag. 19) if LED_mode == 1: self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_MODE_MASK, MAX30102_MODE_IR_ONLY) elif LED_mode == 2: self.set_bitmask(MAX30102_MODE_CONFIG, MAX30102_MODE_MASK, MAX30102_MODE_IR_RED) else: raise ValueError('Wrong LED mode:{0}!'.format(LED_mode)) # Multi-LED Mode Configuration: enable the reading of the LEDs # depending on the chosen mode self.enable_slot(1, SLOT_IR_LED) if LED_mode > 1: self.enable_slot(2, SLOT_RED_LED) # Store the LED mode used to control how many bytes to read from # FIFO buffer in multiLED mode: a sample is made of 3 bytes self._active_leds = LED_mode self._multi_led_read_mode = LED_mode * 3 # ADC Configuration def set_adc_range(self, ADC_range): # ADC range: set the range of the conversion # Options: 2048, 4096, 8192, 16384 # Current draw: 7.81pA. 15.63pA, 31.25pA, 62.5pA per LSB. if ADC_range == 2048: r = MAX30102_ADC_RANGE_2048 elif ADC_range == 4096: r = MAX30102_ADC_RANGE_4096 elif ADC_range == 8192: r = MAX30102_ADC_RANGE_8192 elif ADC_range == 16384: r = MAX30102_ADC_RANGE_16384 else: raise ValueError('Wrong ADC range:{0}!'.format(ADC_range)) self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_ADC_RANGE_MASK, r) # Sample Rate Configuration def set_sample_rate(self, sample_rate): # Sample rate: select the number of samples taken per second. # Options: 50, 100, 200, 400, 800, 1000, 1600, 3200 # Note: in theory, the resulting acquisition frequency for the end user # is sampleRate/sampleAverage. However, it is worth testing it before # assuming that the sensor can effectively sustain that frequency # given its configuration. if sample_rate == 50: sr = MAX30102_SAMPLERATE_50 elif sample_rate == 100: sr = MAX30102_SAMPLERATE_100 elif sample_rate == 200: sr = MAX30102_SAMPLERATE_200 elif sample_rate == 400: sr = MAX30102_SAMPLERATE_400 elif sample_rate == 800: sr = MAX30102_SAMPLERATE_800 elif sample_rate == 1000: sr = MAX30102_SAMPLERATE_1000 elif sample_rate == 1600: sr = MAX30102_SAMPLERATE_1600 elif sample_rate == 3200: sr = MAX30102_SAMPLERATE_3200 else: raise ValueError('Wrong sample rate:{0}!'.format(sample_rate)) self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_SAMPLERATE_MASK, sr) # Store the sample rate and recompute the acq. freq. self._sample_rate = sample_rate self.update_acquisition_frequency() # Pulse width Configuration def set_pulse_width(self, pulse_width): # Pulse width of LEDs: The longer the pulse width the longer range of # detection. At 69us and 0.4mA it's about 2 inches, # at 411us and 0.4mA it's about 6 inches. if pulse_width == 69: pw = MAX30102_PULSE_WIDTH_69 elif pulse_width == 118: pw = MAX30102_PULSE_WIDTH_118 elif pulse_width == 215: pw = MAX30102_PULSE_WIDTH_215 elif pulse_width == 411: pw = MAX30102_PULSE_WIDTH_411 else: raise ValueError('Wrong pulse width:{0}!'.format(pulse_width)) self.set_bitmask(MAX30102_PARTICLE_CONFIG, MAX30102_PULSE_WIDTH_MASK, pw) # Store the pulse width self._pulse_width = pw # LED Pulse Amplitude Configuration methods def set_active_leds_amplitude(self, amplitude): if self._active_leds > 0: self.set_pulse_amplitude_ir(amplitude) if self._active_leds > 1: self.set_pulse_amplitude_red(amplitude) def set_pulse_amplitude_ir(self, amplitude): self.i2c_set_register(MAX30102_LED1_PULSE_AMP, amplitude) def set_pulse_amplitude_red(self, amplitude): self.i2c_set_register(MAX30102_LED2_PULSE_AMP, amplitude) def set_pulse_amplitude_proximity(self, amplitude): self.i2c_set_register(MAX30102_LED_PROX_AMP, amplitude) def set_proximity_threshold(self, thresh_msb): # Set the IR ADC count that will trigger the beginning of particle- # sensing mode.The threshMSB signifies only the 8 most significant-bits # of the ADC count. (datasheet page 24) self.i2c_set_register(MAX30102_PROX_INT_THRESH, thresh_msb) # FIFO averaged samples number Configuration def set_fifo_average(self, number_of_samples): # FIFO sample avg: set the number of samples to be averaged by the chip. # Options: MAX30102_SAMPLE_AVG_1, 2, 4, 8, 16, 32 if number_of_samples == 1: ns = MAX30102_SAMPLE_AVG_1 elif number_of_samples == 2: ns = MAX30102_SAMPLE_AVG_2 elif number_of_samples == 4: ns = MAX30102_SAMPLE_AVG_4 elif number_of_samples == 8: ns = MAX30102_SAMPLE_AVG_8 elif number_of_samples == 16: ns = MAX30102_SAMPLE_AVG_16 elif number_of_samples == 32: ns = MAX30102_SAMPLE_AVG_32 else: raise ValueError( 'Wrong number of samples:{0}!'.format(number_of_samples)) self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_SAMPLE_AVG_MASK, ns) # Store the number of averaged samples and recompute the acq. freq. self._sample_avg = number_of_samples self.update_acquisition_frequency() def update_acquisition_frequency(self): if None in [self._sample_rate, self._sample_avg]: return else: self._acq_frequency = self._sample_rate / self._sample_avg from math import ceil # Compute the time interval to wait before taking a good measure # (see note in setSampleRate() method) self._acq_frequency_inv = int(ceil(1000 / self._acq_frequency)) def get_acquisition_frequency(self): return self._acq_frequency def clear_fifo(self): # Resets all points to start in a known state # Datasheet page 15 recommends clearing FIFO before beginning a read self.i2c_set_register(MAX30102_FIFO_WRITE_PTR, 0) self.i2c_set_register(MAX30102_FIFO_OVERFLOW, 0) self.i2c_set_register(MAX30102_FIFO_READ_PTR, 0) def enable_fifo_rollover(self): # FIFO rollover: enable to allow FIFO tro wrap/roll over self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_ROLLOVER_MASK, MAX30102_ROLLOVER_ENABLE) def disable_fifo_rollover(self): # FIFO rollover: disable to disallow FIFO tro wrap/roll over self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_ROLLOVER_MASK, MAX30102_ROLLOVER_DISABLE) def set_fifo_almost_full(self, number_of_samples): # Set number of samples to trigger the almost full interrupt (page 18) # Power on default is 32 samples. Note it is reverse: 0x00 is # 32 samples, 0x0F is 17 samples self.set_bitmask(MAX30102_FIFO_CONFIG, MAX30102_A_FULL_MASK, number_of_samples) def get_write_pointer(self): # Read the FIFO Write Pointer from the register wp = self.i2c_read_register(MAX30102_FIFO_WRITE_PTR) return wp def get_read_pointer(self): # Read the FIFO Read Pointer from the register wp = self.i2c_read_register(MAX30102_FIFO_READ_PTR) return wp # Die Temperature method: returns the temperature in C def read_temperature(self): # DIE_TEMP_RDY interrupt must be enabled # Config die temperature register to take 1 temperature sample self.i2c_set_register(MAX30102_DIE_TEMP_CONFIG, 0x01) # Poll for bit to clear, reading is then complete reading = ord(self.i2c_read_register(MAX30102_INT_STAT_2)) sleep_ms(100) while (reading & MAX30102_INT_DIE_TEMP_RDY_ENABLE) > 0: reading = ord(self.i2c_read_register(MAX30102_INT_STAT_2)) sleep_ms(1) # Read die temperature register (integer) tempInt = ord(self.i2c_read_register(MAX30102_DIE_TEMP_INT)) # Causes the clearing of the DIE_TEMP_RDY interrupt tempFrac = ord(self.i2c_read_register(MAX30102_DIE_TEMP_FRAC)) # Calculate temperature (datasheet pg. 23) return float(tempInt) + (float(tempFrac) * 0.0625) def set_prox_int_tresh(self, val): # Set the PROX_INT_THRESH (see proximity function on datasheet, pag 10) self.i2c_set_register(MAX30102_PROX_INT_THRESH, val) # DeviceID and Revision methods def read_part_id(self): # Load the Device ID from the register part_id = self.i2c_read_register(MAX30102_PART_ID) return part_id def check_part_id(self): # Checks the correctness of the Device ID part_id = ord(self.read_part_id()) return part_id == MAX30102_EXPECTED_PART_ID def get_revision_id(self): # Load the Revision ID from the register rev_id = self.i2c_read_register(MAX30102_REVISION_ID) return ord(rev_id) # Time slots management for multi-LED operation mode def enable_slot(self, slot_number, device): # In multi-LED mode, each sample is split into up to four time slots, # SLOT1 through SLOT4. These control registers determine which LED is # active in each time slot. (datasheet pag 22) # Devices are SLOT_RED_LED or SLOT_RED_PILOT (proximity) # Assigning a SLOT_RED_LED will pulse LED # Assigning a SLOT_RED_PILOT will detect the proximity if slot_number == 1: self.bitmask(MAX30102_MULTI_LED_CONFIG_1, MAX30102_SLOT1_MASK, device) elif slot_number == 2: self.bitmask(MAX30102_MULTI_LED_CONFIG_1, MAX30102_SLOT2_MASK, device << 4) elif slot_number == 3: self.bitmask(MAX30102_MULTI_LED_CONFIG_2, MAX30102_SLOT3_MASK, device) elif slot_number == 4: self.bitmask(MAX30102_MULTI_LED_CONFIG_2, MAX30102_SLOT4_MASK, device << 4) else: raise ValueError('Wrong slot number:{0}!'.format(slot_number)) def disable_slots(self): # Clear all the slots assignments self.i2c_set_register(MAX30102_MULTI_LED_CONFIG_1, 0) self.i2c_set_register(MAX30102_MULTI_LED_CONFIG_2, 0) # Low-level I2C Communication def i2c_read_register(self, REGISTER, n_bytes=1): self._i2c.writeto(self._address, bytearray([REGISTER])) return self._i2c.readfrom(self._address, n_bytes) def i2c_set_register(self, REGISTER, VALUE): self._i2c.writeto(self._address, bytearray([REGISTER, VALUE])) return # Given a register, read it, mask it, and then set the thing def set_bitmask(self, REGISTER, MASK, NEW_VALUES): newCONTENTS = (ord(self.i2c_read_register(REGISTER)) & MASK) | NEW_VALUES self.i2c_set_register(REGISTER, newCONTENTS) return # Given a register, read it and mask it def bitmask(self, reg, slotMask, thing): originalContents = ord(self.i2c_read_register(reg)) originalContents = originalContents & slotMask self.i2c_set_register(reg, originalContents | thing) def fifo_bytes_to_int(self, fifo_bytes): value = unpack(">i", b'\x00' + fifo_bytes) return (value[0] & 0x3FFFF) >> self._pulse_width # Returns how many samples are available def available(self): number_of_samples = len(self.sense.ir) return number_of_samples # Get a new IR value def get_ir(self): # Check the sensor for new data for 250ms if self.safe_check(250): return self.sense.ir.pop_head() else: # Sensor failed to find new data return 0 # Get a new red value def get_red(self): # Check the sensor for new data for 250ms if self.safe_check(250): return self.sense.red.pop_head() else: # Sensor failed to find new data return 0 # Note: the following 3 functions are the equivalent of using 'getFIFO' # methods of the SparkFun library # Pops the next IR value in storage (if available) def pop_ir_from_storage(self): if len(self.sense.ir) == 0: return 0 else: return self.sense.ir.pop() # Pops the next red value in storage (if available) def pop_red_from_storage(self): if len(self.sense.red) == 0: return 0 else: return self.sense.red.pop() # (useless - for comparison purposes only) def next_sample(self): if self.available(): # With respect to the SparkFun library, using a deque object # allows us to avoid manually advancing of the tail return True # Polls the sensor for new data def check(self): # Call continuously to poll the sensor for new data. read_pointer = ord(self.get_read_pointer()) write_pointer = ord(self.get_write_pointer()) # Do we have new data? if read_pointer != write_pointer: # Calculate the number of readings we need to get from sensor number_of_samples = write_pointer - read_pointer # Wrap condition (return to the beginning of 32 samples) if number_of_samples < 0: number_of_samples += 32 for i in range(number_of_samples): # Read a number of bytes equal to activeLEDs*3 (= 1 sample) fifo_bytes = self.i2c_read_register(MAX30102_FIFO_DATA, self._multi_led_read_mode) # Convert the readings from bytes to integers, depending # on the number of active LEDs if self._active_leds > 0: self.sense.ir.append( self.fifo_bytes_to_int(fifo_bytes[0:3]) ) if self._active_leds > 1: self.sense.red.append( self.fifo_bytes_to_int(fifo_bytes[3:6]) ) return True else: return False # Check for new data but give up after a certain amount of time def safe_check(self, max_time_to_check): mark_time = ticks_ms() while True: if ticks_diff(ticks_ms(), mark_time) > max_time_to_check: # Timeout reached return False if self.check(): # new data found return True sleep_ms(1) ``` ### pulse_oximeter.py ```python= from max30102 import MAX30102 from utime import ticks_ms, ticks_diff class IIR_filter(object): def __init__(self, alpha): self.old_value = 0 self.alpha = alpha def step(self, value): value = (self.old_value*self.alpha + value*(1 - self.alpha)) self.old_value = value return value class AC_extractor(object): def __init__(self): self.max_ac = 0 self.min_ac = 0 self.ac = 0 self.cycle_time_mark = ticks_ms() self.get_time_mark = ticks_ms() self.is_down_period = False def update(self, value_nodc): if value_nodc > 0: if self.max_ac != 0 and self.min_ac != 0: self.is_down_period = False time_intval = ticks_diff(ticks_ms(), self.cycle_time_mark) if 2000 > time_intval > 270: self.ac = self.max_ac - self.min_ac self.get_time_mark = ticks_ms() self.max_ac = 0 self.min_ac = 0 self.cycle_time_mark = ticks_ms() else: if value_nodc > self.max_ac: self.max_ac = value_nodc elif value_nodc < 0 and self.max_ac != 0: self.is_down_period = True if value_nodc < self.min_ac: self.min_ac = value_nodc def reset_ac(self): self.ac = 0 class HR_calculator(object): def __init__(self, target_n_beats=5): self.target_n_beats = target_n_beats self.n_beats = 0 self.heart_rate = 0.0 self.tot_intval = 0 self.beat_time_mark = ticks_ms() self.is_beating = False def update(self, is_beating): if self.is_beating == False and is_beating == True: rr_intval = ticks_diff(ticks_ms(), self.beat_time_mark) if 2000 > rr_intval > 270: self.n_beats += 1 self.tot_intval += rr_intval if self.n_beats == self.target_n_beats: tot_intval = self.tot_intval/1000 self.heart_rate = self.target_n_beats/(tot_intval/60) self.tot_intval = 0 self.n_beats = 0 else: self.tot_intval = 0 self.n_beats = 0 self.beat_time_mark = ticks_ms() self.is_beating = is_beating def get_heart_rate(self): return self.heart_rate class Pulse_oximeter(object): def __init__(self, sensor): sensor.set_led_mode(2) self.sensor = sensor self.raw_ir = 0 self.raw_red = 0 self.spo2 = 0 self.heart_rate = 0 self.is_beating = False self.is_available = False self.ac_extractor_ir = AC_extractor() self.ac_extractor_red = AC_extractor() self.dc_remover_ir = IIR_filter(0.99) self.dc_remover_red = IIR_filter(0.99) self.hr_calculator = HR_calculator() def update(self): self.spo2 = 0 self.sensor.check() if (self.sensor.available()): self.is_available = True self.raw_ir = self.sensor.pop_ir_from_storage() self.raw_red = self.sensor.pop_red_from_storage() ir_dc = self.dc_remover_ir.step(self.raw_ir) red_dc = self.dc_remover_red.step(self.raw_red) ir_nodc = self.raw_ir - ir_dc red_nodc = self.raw_red - red_dc self.ac_extractor_ir.update(ir_nodc) self.ac_extractor_red.update(red_nodc) ir_ac = self.ac_extractor_ir.ac red_ac = self.ac_extractor_red.ac time_mark_ir = self.ac_extractor_ir.get_time_mark time_mark_red = self.ac_extractor_red.get_time_mark self.is_beating = self.ac_extractor_red.is_down_period self.hr_calculator.update(self.is_beating) self.heart_rate = self.hr_calculator.get_heart_rate() ir_red_intval = abs(ticks_diff(time_mark_ir, time_mark_red)) if ir_ac > 0 and red_ac > 0: if ir_red_intval < 100: ratio = (red_ac/red_dc)/(ir_ac/ir_dc) self.spo2 = -45.060*ratio**2 + 30.354*ratio + 94.845 self.ac_extractor_ir.reset_ac() self.ac_extractor_red.reset_ac() else: self.is_available = False def available(self): return self.is_available def get_spo2(self): return self.spo2 def get_raw_ir(self): return self.raw_ir def get_raw_red(self): return self.raw_red def get_heart_rate(self): return self.heart_rate ```