--- disqus: ahb0222 GA : G-VF9ZT413CG --- > [color=#40f1ef][name=LHB阿好伯, 2021/07/16][:earth_africa:](https://www.facebook.com/LHB0222/) [TOC] **** 智慧電網監控系統:十年免月租的 Things Mobile 多模數據傳輸方案 採用 Raspberry Pi Pico 微控制器 搭配 PZEM-004T 電力監測模組進行精確的電力參數採 並通過 OLED 顯示屏實現即時數據 通過軟硬體設計支持多模通信,實現了 WiFi 和4G網絡(通過 EC800K 4G 模組)的自動切換 ![20240728_222612](https://hackmd.io/_uploads/HyFBTC7tR.jpg) ![image](https://hackmd.io/_uploads/H1REgJVKC.png) 這種設計大大提高了數據傳輸的可靠性和適應性 能夠在各種網絡環境下保持穩定運行 [採用了 Things Mobile 的 IoT SIM 卡](https://www.thingsmobile.com/business) ![Snipaste_2024-07-28_22-29-22](https://hackmd.io/_uploads/S1ygRR7YA.png) 這不僅實現了十年免月租 ![Snipaste_2024-07-28_22-31-50](https://hackmd.io/_uploads/rkCZA07KR.png) 還支持全球165+國家的網絡覆蓋 ![image](https://hackmd.io/_uploads/SJIEACXKC.png) 系統採用彈性計費模式,只對實際使用的數據量收費 ![Snipaste_2024-07-28_22-28-12](https://hackmd.io/_uploads/BJWW0R7tA.png) 極大地優化了長期運營成本。 這個解決方案的優勢在於其高度的靈活性和可擴展性 它可以輕易適應不同的監控需求 從小型智能家居到大規模工業應用 | 特徵 | 中華電信 NB-IoT | 中華電信 LTE-M | Things Mobile | |-----|----------------|----------------|---------------| | 資費方案 | 物聯NB-A/B/C型 | 物聯M1-A/B/C型 | 單一方案 | | 月租費 | 10元/25元/60元 | 20元/40元/80元 | 無月租費 | | 啟用費 | 未提供資訊 | 未提供資訊 | 約150元(5美元) | | 國內數據內含量 | 5MB/15MB/40MB | 5MB/15MB/40MB | 可無內含量 | | 超量費率(每MB) | 約1.536元 | 約3.072元 | 約3.9元(0.12美元) | | 數據收費上限 | 750元(含月租費) | 750元(含月租費) | 無上限(可設定) | | 網路技術 | NB-IoT | LTE-M | 多種網路技術 | | 適用地區 | 台灣 | 台灣 | 全球165+國家 | | 合約限制 | 無資料 | 無資料 | 無合約限制 | ```python= import network import time from machine import Pin, I2C, ADC, UART, WDT from ssd1306 import SSD1306_I2C from umqtt.simple import MQTTClient import ujson from pzem import PZEM # WiFi 设置 WIFI_SSID = "WIFI_SSID" WIFI_PASSWORD = "WIFI_PASSWORD" # MQTT 设置 MQTT_BROKER = "MQTT_BROKER" MQTT_PORT = 1883 MQTT_USER = "" MQTT_PASSWORD = "" MQTT_CLIENT_ID = "RP2040Client" MQTT_TOPIC = "MQTT_TOPIC" WILL_TOPIC = "WILL_TOPIC" WILL_MESSAGE = b"offline" # OLED 设置 OLED_WIDTH = 128 OLED_HEIGHT = 64 OLED_ADDR = 0x3C OLED_I2C_SCL = 17 # SCL -> GP17 OLED_I2C_SDA = 16 # SDA -> GP16 # LED 设置 LED_PIN = 0 # GPIO0 # PZEM-004T 设置 PZEM_UART_ID = 1 PZEM_TX_PIN = 8 # GP8 PZEM_RX_PIN = 9 # GP9 # EC800K 设置 EC800K_UART_ID = 0 EC800K_TX_PIN = 12 # GP12 EC800K_RX_PIN = 13 # GP13 wifi = None mqtt_client = None oled = None pzem = None ec800k = None wdt = None led = None def scan_i2c(): i2c = I2C(0, scl=Pin(17), sda=Pin(16)) devices = i2c.scan() if devices: for device in devices: print(f"I2C device found at address: 0x{device:02x}") else: print("No I2C device found") def init_led(): global led led = Pin(LED_PIN, Pin.OUT) def init_wifi(): global wifi wifi = network.WLAN(network.STA_IF) wifi.active(True) wifi.connect(WIFI_SSID, WIFI_PASSWORD) max_attempts = 10 for attempt in range(max_attempts): if wifi.isconnected(): print("Connected to WiFi") return True print(f"Attempting to connect to WiFi... ({attempt+1}/{max_attempts})") time.sleep(1) wdt.feed() print("Failed to connect to WiFi") return False def init_mqtt(): global mqtt_client mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, keepalive=60) mqtt_client.set_last_will(WILL_TOPIC, WILL_MESSAGE, retain=True, qos=1) try: mqtt_client.connect() print("Connected to MQTT broker") return True except: print("Failed to connect to MQTT broker") return False def init_oled(): global oled try: i2c_dev = I2C(0, scl=Pin(OLED_I2C_SCL), sda=Pin(OLED_I2C_SDA), freq=400000) i2c_addr = [hex(ii) for ii in i2c_dev.scan()] if not i2c_addr: print('未找到I2C') return None print(f"I2C 地址: {i2c_addr[0]}") print(f"I2C 配置: {i2c_dev}") oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c_dev) print("OLED 初始化成功") return oled except Exception as e: print(f"OLED 初始化錯誤: {e}") return None def display_info(oled, data, internal_temp): if oled: oled.fill(0) oled.text(f"V:{data['voltage']:.3f} V", 0, 0) oled.text(f"I:{data['current']:.3f} A", 0, 10) oled.text(f"P:{data['power']:.3f} W", 0, 20) oled.text(f"E:{data['energy']:.3f} Wh", 0, 30) oled.text(f"F:{data['frequency']:.1f}Hz;", 0, 40) oled.text(f"Temp:{internal_temp:.1f}", 0, 50) oled.text(f"PF:{data['pf']:.2f}", 72, 40) oled.show() def init_pzem(): global pzem try: uart = UART(PZEM_UART_ID, baudrate=9600, tx=Pin(PZEM_TX_PIN), rx=Pin(PZEM_RX_PIN)) pzem = PZEM(uart=uart) if pzem.readAddress(): print("PZEM initialized successfully") else: print("Failed to communicate with PZEM") pzem = None except Exception as e: print(f"Error initializing PZEM: {e}") pzem = None def init_ec800k(): global ec800k wdt.feed() try: ec800k = UART(EC800K_UART_ID, baudrate=115200, tx=Pin(EC800K_TX_PIN), rx=Pin(EC800K_RX_PIN)) commands = [ "AT+CGATT=0", "AT+CGATT=1", "AT+COPS=0", 'AT+QICSGP=1,1,"TM"', "AT+QIACT=1", 'AT+QMTOPEN=0,"broker.emqx.io",1883', 'AT+QMTCONN=0,"mqttx_7afa236d","",""' ] for cmd in commands: ec800k.write(cmd + "\r\n") time.sleep(2) wdt.feed() response = ec800k.read() print(f"Command: {cmd}") print(f"Response: {response}") if b'OK' not in response and b'QMTOPEN: 0,0' not in response and b'QMTCONN: 0,0,0' not in response: print(f"EC800K initialization failed at command: {cmd}") return False print("EC800K initialized successfully") return True except Exception as e: print(f"Error initializing EC800K: {e}") return False def read_pzem(): if pzem is not None: pzem.uart.read() if pzem.read(): return { "voltage": pzem.getVoltage(), "current": pzem.getCurrent(), "power": pzem.getActivePower(), "energy": pzem.getActiveEnergy(), "frequency": pzem.getFrequency(), "pf": pzem.getPowerFactor() } return None def send_data_via_ec800k(topic, payload): try: command = f'AT+QMTPUBEX=0,1,1,0,"{topic}",{len(payload)}\r\n' ec800k.write(command) time.sleep(0.5) response = ec800k.read() print(f"QMTPUBEX command response: {response}") if b'>' in response: ec800k.write(payload + '\r\n') time.sleep(1) response = ec800k.read() print(f"Payload send response: {response}") if b'+QMTPUBEX: 0,0,0' in response: print("Data sent successfully via EC800K") return True else: print("Failed to send data via EC800K") return False else: print("Did not receive '>' prompt") return False except Exception as e: print(f"Error sending data via EC800K: {e}") return False def send_data(topic, payload): if wifi.isconnected(): try: mqtt_client.publish(topic, payload) print(f"Data sent via MQTT: {payload}") except: print("Failed to send data via MQTT") else: send_data_via_ec800k(topic, payload) print(f"Data sent via EC800K: {payload}") def read_internal_temperature(): adc = ADC(4) # ADC for internal temperature sensor raw = adc.read_u16() voltage = (raw * 3.3) / 65535 temperature = 27 - (voltage - 0.706) / 0.001721 return temperature def read_and_display_pzem_data(): internal_temp = read_internal_temperature() if pzem is not None: pzem_data = read_pzem() if pzem_data: if oled: display_info(oled, pzem_data, internal_temp) else: print("OLED update skipped") pzem_data['ConnectionType'] = 'WiFi' if wifi.isconnected() else 'EC800K' pzem_data['internal_temp'] = internal_temp send_data("ahb0222/TM/908/PZEM", ujson.dumps(pzem_data)) return pzem_data, internal_temp else: print("Failed to read PZEM data") else: print("PZEM not initialized, skipping data reading") return None, internal_temp def main(): global wdt init_led() wdt = WDT(timeout=8000) # 8 seconds timeout oled = init_oled() if oled: display_info(oled, {"voltage": 0, "current": 0, "power": 0, "energy": 0, "frequency": 0, "pf": 0}, 0) time.sleep(2) scan_i2c() wifi_connected = False mqtt_connected = False ec800k_initialized = False while True: try: wdt.feed() if not wifi_connected: print("Attempting to connect to WiFi...") wifi_connected = init_wifi() if wifi_connected: mqtt_connected = init_mqtt() if not mqtt_connected: print("MQTT initialization failed, switching to EC800K") init_ec800k() ec800k_initialized = True else: print("WiFi connection failed, switching to EC800K") init_ec800k() ec800k_initialized = True if wifi_connected and not wifi.isconnected(): print("WiFi connection lost. Attempting to reconnect...") wifi_connected = init_wifi() if wifi_connected: mqtt_connected = init_mqtt() else: print("WiFi reconnection failed, switching to EC800K") init_ec800k() ec800k_initialized = True if pzem is None: print("Attempting to initialize PZEM...") init_pzem() pzem_data, internal_temp = read_and_display_pzem_data() if pzem_data: temp_data = { "tamp": time.time(), "temp": internal_temp, "ConnectionType": 'WiFi' if wifi_connected else 'EC800K' } if wifi_connected and mqtt_connected: try: #send_data(MQTT_TOPIC, ujson.dumps(temp_data)) print("") except Exception as e: print(f"Error sending data via MQTT: {e}") mqtt_connected = False elif ec800k_initialized: try: #send_data_via_ec800k(MQTT_TOPIC, ujson.dumps(temp_data)) print("") except Exception as e: print(f"Error sending data via EC800K: {e}") ec800k_initialized = False else: print("No available connection method") led.toggle() time.sleep(2) except Exception as e: print(f"Error in main loop: {e}") if __name__ == "__main__": main() ``` 🌟 🌟全文可以至下方連結觀看或是補充 全文分享至 https://www.facebook.com/LHB0222/ https://www.instagram.com/ahb0222/ 有疑問想討論的都歡迎於下方留言 喜歡的幫我分享給所有的朋友 \o/ 有所錯誤歡迎指教 # [:page_with_curl: 全部文章列表](https://hackmd.io/@LHB-0222/AllWritings) ![](https://i.imgur.com/nHEcVmm.jpg)