---
disqus: ahb0222
GA : G-VF9ZT413CG
---
> [color=#40f1ef][name=LHB阿好伯, 2021/07/16][:earth_africa:](https://www.facebook.com/LHB0222/)
[TOC]
****
智慧電網監控系統:十年免月租的 Things Mobile 多模數據傳輸方案
採用 Raspberry Pi Pico 微控制器
搭配 PZEM-004T 電力監測模組進行精確的電力參數採
並通過 OLED 顯示屏實現即時數據
通過軟硬體設計支持多模通信,實現了 WiFi 和4G網絡(通過 EC800K 4G 模組)的自動切換


這種設計大大提高了數據傳輸的可靠性和適應性
能夠在各種網絡環境下保持穩定運行
[採用了 Things Mobile 的 IoT SIM 卡](https://www.thingsmobile.com/business)

這不僅實現了十年免月租

還支持全球165+國家的網絡覆蓋

系統採用彈性計費模式,只對實際使用的數據量收費

極大地優化了長期運營成本。
這個解決方案的優勢在於其高度的靈活性和可擴展性
它可以輕易適應不同的監控需求
從小型智能家居到大規模工業應用
| 特徵 | 中華電信 NB-IoT | 中華電信 LTE-M | Things Mobile |
|-----|----------------|----------------|---------------|
| 資費方案 | 物聯NB-A/B/C型 | 物聯M1-A/B/C型 | 單一方案 |
| 月租費 | 10元/25元/60元 | 20元/40元/80元 | 無月租費 |
| 啟用費 | 未提供資訊 | 未提供資訊 | 約150元(5美元) |
| 國內數據內含量 | 5MB/15MB/40MB | 5MB/15MB/40MB | 可無內含量 |
| 超量費率(每MB) | 約1.536元 | 約3.072元 | 約3.9元(0.12美元) |
| 數據收費上限 | 750元(含月租費) | 750元(含月租費) | 無上限(可設定) |
| 網路技術 | NB-IoT | LTE-M | 多種網路技術 |
| 適用地區 | 台灣 | 台灣 | 全球165+國家 |
| 合約限制 | 無資料 | 無資料 | 無合約限制 |
```python=
import network
import time
from machine import Pin, I2C, ADC, UART, WDT
from ssd1306 import SSD1306_I2C
from umqtt.simple import MQTTClient
import ujson
from pzem import PZEM
# WiFi 设置
WIFI_SSID = "WIFI_SSID"
WIFI_PASSWORD = "WIFI_PASSWORD"
# MQTT 设置
MQTT_BROKER = "MQTT_BROKER"
MQTT_PORT = 1883
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_CLIENT_ID = "RP2040Client"
MQTT_TOPIC = "MQTT_TOPIC"
WILL_TOPIC = "WILL_TOPIC"
WILL_MESSAGE = b"offline"
# OLED 设置
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_ADDR = 0x3C
OLED_I2C_SCL = 17 # SCL -> GP17
OLED_I2C_SDA = 16 # SDA -> GP16
# LED 设置
LED_PIN = 0 # GPIO0
# PZEM-004T 设置
PZEM_UART_ID = 1
PZEM_TX_PIN = 8 # GP8
PZEM_RX_PIN = 9 # GP9
# EC800K 设置
EC800K_UART_ID = 0
EC800K_TX_PIN = 12 # GP12
EC800K_RX_PIN = 13 # GP13
wifi = None
mqtt_client = None
oled = None
pzem = None
ec800k = None
wdt = None
led = None
def scan_i2c():
i2c = I2C(0, scl=Pin(17), sda=Pin(16))
devices = i2c.scan()
if devices:
for device in devices:
print(f"I2C device found at address: 0x{device:02x}")
else:
print("No I2C device found")
def init_led():
global led
led = Pin(LED_PIN, Pin.OUT)
def init_wifi():
global wifi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_attempts = 10
for attempt in range(max_attempts):
if wifi.isconnected():
print("Connected to WiFi")
return True
print(f"Attempting to connect to WiFi... ({attempt+1}/{max_attempts})")
time.sleep(1)
wdt.feed()
print("Failed to connect to WiFi")
return False
def init_mqtt():
global mqtt_client
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, keepalive=60)
mqtt_client.set_last_will(WILL_TOPIC, WILL_MESSAGE, retain=True, qos=1)
try:
mqtt_client.connect()
print("Connected to MQTT broker")
return True
except:
print("Failed to connect to MQTT broker")
return False
def init_oled():
global oled
try:
i2c_dev = I2C(0, scl=Pin(OLED_I2C_SCL), sda=Pin(OLED_I2C_SDA), freq=400000)
i2c_addr = [hex(ii) for ii in i2c_dev.scan()]
if not i2c_addr:
print('未找到I2C')
return None
print(f"I2C 地址: {i2c_addr[0]}")
print(f"I2C 配置: {i2c_dev}")
oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c_dev)
print("OLED 初始化成功")
return oled
except Exception as e:
print(f"OLED 初始化錯誤: {e}")
return None
def display_info(oled, data, internal_temp):
if oled:
oled.fill(0)
oled.text(f"V:{data['voltage']:.3f} V", 0, 0)
oled.text(f"I:{data['current']:.3f} A", 0, 10)
oled.text(f"P:{data['power']:.3f} W", 0, 20)
oled.text(f"E:{data['energy']:.3f} Wh", 0, 30)
oled.text(f"F:{data['frequency']:.1f}Hz;", 0, 40)
oled.text(f"Temp:{internal_temp:.1f}", 0, 50)
oled.text(f"PF:{data['pf']:.2f}", 72, 40)
oled.show()
def init_pzem():
global pzem
try:
uart = UART(PZEM_UART_ID, baudrate=9600, tx=Pin(PZEM_TX_PIN), rx=Pin(PZEM_RX_PIN))
pzem = PZEM(uart=uart)
if pzem.readAddress():
print("PZEM initialized successfully")
else:
print("Failed to communicate with PZEM")
pzem = None
except Exception as e:
print(f"Error initializing PZEM: {e}")
pzem = None
def init_ec800k():
global ec800k
wdt.feed()
try:
ec800k = UART(EC800K_UART_ID, baudrate=115200, tx=Pin(EC800K_TX_PIN), rx=Pin(EC800K_RX_PIN))
commands = [
"AT+CGATT=0",
"AT+CGATT=1",
"AT+COPS=0",
'AT+QICSGP=1,1,"TM"',
"AT+QIACT=1",
'AT+QMTOPEN=0,"broker.emqx.io",1883',
'AT+QMTCONN=0,"mqttx_7afa236d","",""'
]
for cmd in commands:
ec800k.write(cmd + "\r\n")
time.sleep(2)
wdt.feed()
response = ec800k.read()
print(f"Command: {cmd}")
print(f"Response: {response}")
if b'OK' not in response and b'QMTOPEN: 0,0' not in response and b'QMTCONN: 0,0,0' not in response:
print(f"EC800K initialization failed at command: {cmd}")
return False
print("EC800K initialized successfully")
return True
except Exception as e:
print(f"Error initializing EC800K: {e}")
return False
def read_pzem():
if pzem is not None:
pzem.uart.read()
if pzem.read():
return {
"voltage": pzem.getVoltage(),
"current": pzem.getCurrent(),
"power": pzem.getActivePower(),
"energy": pzem.getActiveEnergy(),
"frequency": pzem.getFrequency(),
"pf": pzem.getPowerFactor()
}
return None
def send_data_via_ec800k(topic, payload):
try:
command = f'AT+QMTPUBEX=0,1,1,0,"{topic}",{len(payload)}\r\n'
ec800k.write(command)
time.sleep(0.5)
response = ec800k.read()
print(f"QMTPUBEX command response: {response}")
if b'>' in response:
ec800k.write(payload + '\r\n')
time.sleep(1)
response = ec800k.read()
print(f"Payload send response: {response}")
if b'+QMTPUBEX: 0,0,0' in response:
print("Data sent successfully via EC800K")
return True
else:
print("Failed to send data via EC800K")
return False
else:
print("Did not receive '>' prompt")
return False
except Exception as e:
print(f"Error sending data via EC800K: {e}")
return False
def send_data(topic, payload):
if wifi.isconnected():
try:
mqtt_client.publish(topic, payload)
print(f"Data sent via MQTT: {payload}")
except:
print("Failed to send data via MQTT")
else:
send_data_via_ec800k(topic, payload)
print(f"Data sent via EC800K: {payload}")
def read_internal_temperature():
adc = ADC(4) # ADC for internal temperature sensor
raw = adc.read_u16()
voltage = (raw * 3.3) / 65535
temperature = 27 - (voltage - 0.706) / 0.001721
return temperature
def read_and_display_pzem_data():
internal_temp = read_internal_temperature()
if pzem is not None:
pzem_data = read_pzem()
if pzem_data:
if oled:
display_info(oled, pzem_data, internal_temp)
else:
print("OLED update skipped")
pzem_data['ConnectionType'] = 'WiFi' if wifi.isconnected() else 'EC800K'
pzem_data['internal_temp'] = internal_temp
send_data("ahb0222/TM/908/PZEM", ujson.dumps(pzem_data))
return pzem_data, internal_temp
else:
print("Failed to read PZEM data")
else:
print("PZEM not initialized, skipping data reading")
return None, internal_temp
def main():
global wdt
init_led()
wdt = WDT(timeout=8000) # 8 seconds timeout
oled = init_oled()
if oled:
display_info(oled, {"voltage": 0, "current": 0, "power": 0, "energy": 0, "frequency": 0, "pf": 0}, 0)
time.sleep(2)
scan_i2c()
wifi_connected = False
mqtt_connected = False
ec800k_initialized = False
while True:
try:
wdt.feed()
if not wifi_connected:
print("Attempting to connect to WiFi...")
wifi_connected = init_wifi()
if wifi_connected:
mqtt_connected = init_mqtt()
if not mqtt_connected:
print("MQTT initialization failed, switching to EC800K")
init_ec800k()
ec800k_initialized = True
else:
print("WiFi connection failed, switching to EC800K")
init_ec800k()
ec800k_initialized = True
if wifi_connected and not wifi.isconnected():
print("WiFi connection lost. Attempting to reconnect...")
wifi_connected = init_wifi()
if wifi_connected:
mqtt_connected = init_mqtt()
else:
print("WiFi reconnection failed, switching to EC800K")
init_ec800k()
ec800k_initialized = True
if pzem is None:
print("Attempting to initialize PZEM...")
init_pzem()
pzem_data, internal_temp = read_and_display_pzem_data()
if pzem_data:
temp_data = {
"tamp": time.time(),
"temp": internal_temp,
"ConnectionType": 'WiFi' if wifi_connected else 'EC800K'
}
if wifi_connected and mqtt_connected:
try:
#send_data(MQTT_TOPIC, ujson.dumps(temp_data))
print("")
except Exception as e:
print(f"Error sending data via MQTT: {e}")
mqtt_connected = False
elif ec800k_initialized:
try:
#send_data_via_ec800k(MQTT_TOPIC, ujson.dumps(temp_data))
print("")
except Exception as e:
print(f"Error sending data via EC800K: {e}")
ec800k_initialized = False
else:
print("No available connection method")
led.toggle()
time.sleep(2)
except Exception as e:
print(f"Error in main loop: {e}")
if __name__ == "__main__":
main()
```
🌟
🌟全文可以至下方連結觀看或是補充
全文分享至
https://www.facebook.com/LHB0222/
https://www.instagram.com/ahb0222/
有疑問想討論的都歡迎於下方留言
喜歡的幫我分享給所有的朋友 \o/
有所錯誤歡迎指教
# [:page_with_curl: 全部文章列表](https://hackmd.io/@LHB-0222/AllWritings)
