true_temp = 1.68 * read_temp + -7.46
"""
Buzzer:
pos:pin12
neg:pin14
"""
import Jetson.GPIO as GPIO
import time
import time,board,busio
import adafruit_mlx90640
# GPIO.setmode(GPIO.BOARD)
# GPIO.setmode(GPIO.BCM)
# GPIO.setmode(GPIO.TEGRA_SOC)
print(GPIO.getmode())
"""
TEGRA_SOC = 1000 *
BOARD = 10
BCM = 11
CVM = 1001
"""
# setup I2C & MLX90640
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000) # setup I2C
mlx = adafruit_mlx90640.MLX90640(i2c) # begin MLX90640 with I2C comm
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_8_HZ # 16Hz max
print("------------ok--------------")
# board = 12, BCM = 18, TEGRA_SOC = "DAP4_SCLK"
pin_buzzer = "DAP4_SCLK"
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
while True:
if input() == "s": # start buzzer
GPIO.output(pin_buzzer, GPIO.HIGH)
if input() == "p": # pause
GPIO.output(pin_buzzer, GPIO.LOW)
if input() == "q":
GPIO.output(pin_buzzer, GPIO.LOW)
break
GPIO.cleanup()
#!/usr/bin/python3
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue
import time,board,busio
import adafruit_mlx90640
import numpy as np
import cv2
import datetime as dt
import os
import requests
import jetson.inference
import jetson.utils
import argparse
import sys
import Jetson.GPIO as GPIO
import time
"""
*TABS used throughout code*
Alarm temperature:
no person (init): 150
detects person : 200
Buzzer:
GPIO pin nameing : TEGRA_SOC
pos:pin12 (DAP4_SCLK)
neg:pin14
on = GPIO HIGH
off = GPIO LOW
"""
# labels
labels = ["BACKGROUND", "Person", "Stove", "Microwave", "Oven"]
# buzzer
pin_buzzer = "DAP4_SCLK"
# GPIO.setmode(GPIO.TEGRA_SOC)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
# setup I2C & MLX90640
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000) # setup I2C
mlx = adafruit_mlx90640.MLX90640(i2c) # begin MLX90640 with I2C comm
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_8_HZ # 16Hz max
mlx_shape = (24,32)
tdata = np.zeros((24*32,))
alpha = 0.5
WinW = 960
WinH =720
Ratio = 30 # 960/32=30
tframe = np.reshape(np.zeros((WinH*WinW,)), (WinH,WinW))
def td_to_img(f,tmax,tmin):
norm = np.uint8((f - tmin)*255/(tmax-tmin))
return norm
def tframe2Que(outputQueue):
while True:
mlx.getFrame(tdata) # read MLX temperatures into frame var
t_img = (np.reshape(tdata,mlx_shape)) # reshape to 24x32
t_img = np.fliplr(t_img)
outputQueue.put(t_img)
print("[INFO] starting MLX90640 process...")
tempQueue = Queue(maxsize=1)
p = Process(target=tframe2Que, args=(tempQueue,))
p.daemon = True
p.start()
# line notify
workdir = '/home/huang/jetson-inference/python/training/detection/ssd'
LINE_ACCESS_TOKEN='NNtLMvjiviTH5dWCnqAjegMICRJY0VTZdrglcCwRpcP' # JetsonNano
url = 'https://notify-api.line.me/api/notify'
# create blank screen for cv2 waitkey() function
thermal_image = np.zeros(shape=[240, 320, 3], dtype=np.uint8)
cv2.imshow("Thermal Image", thermal_image)
# parse the command line
parser = argparse.ArgumentParser(description="Locate objects in a live camera stream using an object detection DNN.",
formatter_class=argparse.RawTextHelpFormatter, epilog=jetson.inference.detectNet.Usage() +
jetson.utils.videoSource.Usage() + jetson.utils.videoOutput.Usage() + jetson.utils.logUsage())
parser.add_argument("input_URI", type=str, default="", nargs='?', help="URI of the input stream")
parser.add_argument("output_URI", type=str, default="", nargs='?', help="URI of the output stream")
parser.add_argument("--network", type=str, default="ssd-mobilenet-v2", help="pre-trained model to load (see below for options)")
parser.add_argument("--overlay", type=str, default="box,labels,conf", help="detection overlay flags (e.g. --overlay=box,labels,conf)\nvalid combinations are: 'box', 'labels', 'conf', 'none'")
parser.add_argument("--threshold", type=float, default=0.5, help="minimum detection threshold to use")
is_headless = ["--headless"] if sys.argv[0].find('console.py') != -1 else [""]
try:
opt = parser.parse_known_args()[0]
except:
print("")
parser.print_help()
sys.exit(0)
# create video output object
output = jetson.utils.videoOutput(opt.output_URI, argv=sys.argv+is_headless)
font = jetson.utils.cudaFont()
# load the object detection network
net = jetson.inference.detectNet(opt.network, sys.argv, opt.threshold)
# create video sources
input = jetson.utils.videoSource(opt.input_URI, argv=sys.argv)
alarm_temp = 150
s_key = False
buzz_key = False
# process frames until the user exits
while True:
# capture the next image
img = input.Capture() # img: original frame in cuda memory
detections = net.Detect(img, overlay=opt.overlay)
if not tempQueue.empty():
tframe = tempQueue.get() # tframe: thermal data
tmax = tframe.max()
tmin = tframe.min()
nl_tframe = td_to_img(tframe, tmax, tmin) # nl_tframe: normalized thermal data
# thermal data to color map
c_img = cv2.applyColorMap(nl_tframe, cv2.COLORMAP_JET) # c_img: color mapped thermal image, cv2.COLORMAP_HSV
cs_img = cv2.resize(c_img, (320,240), interpolation = cv2.INTER_CUBIC) # cs_img: small color mapped thermal image
cv2.imshow("Thermal Image", cs_img)
# keyboard input
key = cv2.waitKey(1) & 0xFF
if key == ord("q"): # quit
GPIO.output(pin_buzzer, GPIO.LOW)
break
if key == ord("s"): # simulate alarm
s_key = True # s_key: flage, S key pressed
# convert cudaimg to cv2img
np_img = jetson.utils.cudaToNumpy(img, WinW, WinH, 3) # np_img: numpy array from cuda
cv_img = cv2.cvtColor(np_img.astype (np.uint8), cv2.COLOR_RGBA2BGR) # cv_img: opencv image
# overlay thermal image
cl_img = cv2.resize(c_img, (WinW,WinH), interpolation = cv2.INTER_CUBIC) # cl_img: large color mapped thermal image
ov_img = cv2.addWeighted(cv_img, alpha, cl_img, 1-alpha, 0) # ov_img: cv_img + cl_img
if (detections is not None) and (tframe is not None):
# check if has person
has_person = 0
for detection in detections:
if labels[detection.ClassID] == 'Person':
has_person += 1
# print("person + 1")
# set alarm temperature and buzz_key
if has_person > 0:
alarm_temp = 200
buzz_key = False
else:
alarm_temp = 150
# go through each detection
for detection in detections:
box = np.array([detection.Left, detection.Top, detection.Right, detection.Bottom])/Ratio
(startX, startY, endX, endY) = box.astype("int")
if startX>0 : startX -= 1
if startY>0 : startY -= 1
if endX <32 : endX += 1
if endY <24 : endY += 1
# true_temp = 1.68 * read_temp + -7.46
tmax = tframe[startY:endY, startX:endX].max() * 1.68 - 7.46
text = labels[detection.ClassID] + " Tmax={:.1f} C".format(tmax)
font.OverlayText(img, img.width, img.height, text, int(detection.Left), int(detection.Top), font.White, font.Gray20)
# check if need alarm
if (tmax) > alarm_temp:
s_key = True
if s_key == True:
cv2.rectangle(cv_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(cv_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
cv2.rectangle(ov_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(ov_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
# line notify + alarm
if s_key == True:
# start buzzer
buzz_key = True
# save cv_img
fname = 'cv_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, cv_img)
print('Saving image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Object Detected'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
# save ov_img
fname = 'ov_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, ov_img)
print('Saving thermal overlay image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Thermal Overlay Image'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
s_key = False
# buzzer
if buzz_key == True:
GPIO.output(pin_buzzer, GPIO.HIGH)
GPIO.output(pin_buzzer, GPIO.HIGH)
else:
GPIO.output(pin_buzzer, GPIO.LOW)
output.Render(img)
# print the detections
# print("detected {:d} objects in image".format(len(detections)))
# for detection in detections:
# print(detection)
# print out performance info
# net.PrintProfilerTimes()
# exit on input/output EOS
if not input.IsStreaming() or not output.IsStreaming():
break
# update the title bar
output.SetStatus("{:s} | Network {:.0f} FPS".format(opt.network, net.GetNetworkFPS()))
cv2.destroyAllWindows()
GPIO.cleanup()
"""
import Jetson.GPIO as GPIO
import time
pin_buzzer=12
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
while True:
if input() == "s": # start buzzer
GPIO.output(pin_buzzer, GPIO.HIGH)
if input() == "p": # pause
GPIO.output(pin_buzzer, GPIO.LOW)
if input() == "q":
GPIO.output(pin_buzzer, GPIO.LOW)
break
GPIO.cleanup()
"""
UNDONE
#!/usr/bin/python3
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue
import time,board,busio
import adafruit_mlx90640
import numpy as np
import cv2
import datetime as dt
import os
import requests
import jetson.inference
import jetson.utils
import argparse
import sys
import Jetson.GPIO as GPIO
import time
"""
*TABS used throughout code*
Alarm temperature:
no person (init): 150
detects person : 200
Buzzer:
GPIO pin nameing : TEGRA_SOC
pos:pin12 (DAP4_SCLK)
neg:pin14
on = GPIO HIGH
off = GPIO LOW
"""
# labels
labels = ["BACKGROUND", "Person", "Stove", "Microwave", "Oven"]
alarm_temp = [60, 300, 300, 100, 100]
# buzzer
pin_buzzer = "DAP4_SCLK"
# GPIO.setmode(GPIO.TEGRA_SOC)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
# setup I2C & MLX90640
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000) # setup I2C
mlx = adafruit_mlx90640.MLX90640(i2c) # begin MLX90640 with I2C comm
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_8_HZ # 16Hz max
mlx_shape = (24,32)
tdata = np.zeros((24*32,))
alpha = 0.5
WinW = 960
WinH =720
Ratio = 30 # 960/32=30
tframe = np.reshape(np.zeros((WinH*WinW,)), (WinH,WinW))
def td_to_img(f,tmax,tmin):
norm = np.uint8((f - tmin)*255/(tmax-tmin))
return norm
def tframe2Que(outputQueue):
while True:
mlx.getFrame(tdata) # read MLX temperatures into frame var
t_img = (np.reshape(tdata,mlx_shape)) # reshape to 24x32
t_img = np.fliplr(t_img)
outputQueue.put(t_img)
print("[INFO] starting MLX90640 process...")
tempQueue = Queue(maxsize=1)
p = Process(target=tframe2Que, args=(tempQueue,))
p.daemon = True
p.start()
# line notify
workdir = '/home/huang/jetson-inference/python/training/detection/ssd'
LINE_ACCESS_TOKEN='NNtLMvjiviTH5dWCnqAjegMICRJY0VTZdrglcCwRpcP' # JetsonNano
url = 'https://notify-api.line.me/api/notify'
# create blank screen for cv2 waitkey() function
thermal_image = np.zeros(shape=[240, 320, 3], dtype=np.uint8)
cv2.imshow("Thermal Image", thermal_image)
# parse the command line
parser = argparse.ArgumentParser(description="Locate objects in a live camera stream using an object detection DNN.",
formatter_class=argparse.RawTextHelpFormatter, epilog=jetson.inference.detectNet.Usage() +
jetson.utils.videoSource.Usage() + jetson.utils.videoOutput.Usage() + jetson.utils.logUsage())
parser.add_argument("input_URI", type=str, default="", nargs='?', help="URI of the input stream")
parser.add_argument("output_URI", type=str, default="", nargs='?', help="URI of the output stream")
parser.add_argument("--network", type=str, default="ssd-mobilenet-v2", help="pre-trained model to load (see below for options)")
parser.add_argument("--overlay", type=str, default="box,labels,conf", help="detection overlay flags (e.g. --overlay=box,labels,conf)\nvalid combinations are: 'box', 'labels', 'conf', 'none'")
parser.add_argument("--threshold", type=float, default=0.5, help="minimum detection threshold to use")
is_headless = ["--headless"] if sys.argv[0].find('console.py') != -1 else [""]
try:
opt = parser.parse_known_args()[0]
except:
print("")
parser.print_help()
sys.exit(0)
# create video output object
output = jetson.utils.videoOutput(opt.output_URI, argv=sys.argv+is_headless)
font = jetson.utils.cudaFont()
# load the object detection network
net = jetson.inference.detectNet(opt.network, sys.argv, opt.threshold)
# create video sources
input = jetson.utils.videoSource(opt.input_URI, argv=sys.argv)
s_key = False
buzz_key = False
# process frames until the user exits
while True:
# capture the next image
img = input.Capture() # img: original frame in cuda memory
detections = net.Detect(img, overlay=opt.overlay)
if not tempQueue.empty():
tframe = tempQueue.get() # tframe: thermal data
tmax = tframe.max()
tmin = tframe.min()
nl_tframe = td_to_img(tframe, tmax, tmin) # nl_tframe: normalized thermal data
# thermal data to color map
c_img = cv2.applyColorMap(nl_tframe, cv2.COLORMAP_JET) # c_img: color mapped thermal image, cv2.COLORMAP_HSV
cs_img = cv2.resize(c_img, (320,240), interpolation = cv2.INTER_CUBIC) # cs_img: small color mapped thermal image
cv2.imshow("Thermal Image", cs_img)
# keyboard input
key = cv2.waitKey(1) & 0xFF
if key == ord("q"): # quit
GPIO.output(pin_buzzer, GPIO.LOW)
break
if key == ord("s"): # simulate alarm
s_key = True # s_key: flage, S key pressed
# convert cudaimg to cv2img
np_img = jetson.utils.cudaToNumpy(img, WinW, WinH, 3) # np_img: numpy array from cuda
cv_img = cv2.cvtColor(np_img.astype (np.uint8), cv2.COLOR_RGBA2BGR) # cv_img: opencv image
# overlay thermal image
cl_img = cv2.resize(c_img, (WinW,WinH), interpolation = cv2.INTER_CUBIC) # cl_img: large color mapped thermal image
ov_img = cv2.addWeighted(cv_img, alpha, cl_img, 1-alpha, 0) # ov_img: cv_img + cl_img
if (detections is not None) and (tframe is not None):
# check if has person
has_person = 0
for detection in detections:
if labels[detection.ClassID] == 'Person':
has_person += 1
# print("person + 1")
# set alarm temperature and buzz_key
if has_person > 0:
alarm_temp = 200
buzz_key = False
else:
alarm_temp = 150
# go through each detection
for detection in detections:
box = np.array([detection.Left, detection.Top, detection.Right, detection.Bottom])/Ratio
(startX, startY, endX, endY) = box.astype("int")
if startX>0 : startX -= 1
if startY>0 : startY -= 1
if endX <32 : endX += 1
if endY <24 : endY += 1
# true_temp = 1.68 * read_temp + -7.46
tmax = tframe[startY:endY, startX:endX].max() * 1.68 - 7.46
text = labels[detection.ClassID] + " Tmax={:.1f} C".format(tmax)
font.OverlayText(img, img.width, img.height, text, int(detection.Left), int(detection.Top), font.White, font.Gray20)
# check if need alarm
if (tmax) > alarm_temp[detection.ClassID]:
s_key = True
if s_key == True:
cv2.rectangle(cv_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(cv_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
cv2.rectangle(ov_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(ov_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
# line notify + alarm
if s_key == True:
# start buzzer
buzz_key = True
# save cv_img
fname = 'cv_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, cv_img)
print('Saving image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Object Detected'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
# save ov_img
fname = 'ov_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, ov_img)
print('Saving thermal overlay image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Thermal Overlay Image'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
s_key = False
# buzzer
if buzz_key == True:
GPIO.output(pin_buzzer, GPIO.HIGH)
GPIO.output(pin_buzzer, GPIO.HIGH)
else:
GPIO.output(pin_buzzer, GPIO.LOW)
output.Render(img)
# print the detections
# print("detected {:d} objects in image".format(len(detections)))
# for detection in detections:
# print(detection)
# print out performance info
# net.PrintProfilerTimes()
# exit on input/output EOS
if not input.IsStreaming() or not output.IsStreaming():
break
# update the title bar
output.SetStatus("{:s} | Network {:.0f} FPS".format(opt.network, net.GetNetworkFPS()))
cv2.destroyAllWindows()
GPIO.cleanup()
"""
import Jetson.GPIO as GPIO
import time
pin_buzzer=12
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
while True:
if input() == "s": # start buzzer
GPIO.output(pin_buzzer, GPIO.HIGH)
if input() == "p": # pause
GPIO.output(pin_buzzer, GPIO.LOW)
if input() == "q":
GPIO.output(pin_buzzer, GPIO.LOW)
break
GPIO.cleanup()
"""
#!/usr/bin/python3
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue
import time,board,busio
import adafruit_mlx90640
import numpy as np
import cv2
import datetime as dt
import os
import requests
import jetson.inference
import jetson.utils
import argparse
import sys
import Jetson.GPIO as GPIO
import time
"""
*TABS used throughout code*
Alarm temperature:
no person (init): 150
detects person : 200
Buzzer:
GPIO pin nameing : TEGRA_SOC
pos:pin12 (DAP4_SCLK)
neg:pin14
on = GPIO HIGH
off = GPIO LOW
"""
# labels
labels = ["BACKGROUND", "Person", "Stove", "Microwave", "Oven"]
alarm_temp = [60, 300, 300, 100, 100]
# buzzer
pin_buzzer = "DAP4_SCLK"
# GPIO.setmode(GPIO.TEGRA_SOC)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
# setup I2C & MLX90640
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000) # setup I2C
mlx = adafruit_mlx90640.MLX90640(i2c) # begin MLX90640 with I2C comm
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_8_HZ # 16Hz max
mlx_shape = (24,32)
tdata = np.zeros((24*32,))
alpha = 0.5
WinW = 960
WinH =720
Ratio = 30 # 960/32=30
tframe = np.reshape(np.zeros((WinH*WinW,)), (WinH,WinW))
def td_to_img(f,tmax,tmin):
norm = np.uint8((f - tmin)*255/(tmax-tmin))
return norm
def tframe2Que(outputQueue):
while True:
mlx.getFrame(tdata) # read MLX temperatures into frame var
t_img = (np.reshape(tdata,mlx_shape)) # reshape to 24x32
t_img = np.fliplr(t_img)
outputQueue.put(t_img)
print("[INFO] starting MLX90640 process...")
tempQueue = Queue(maxsize=1)
p = Process(target=tframe2Que, args=(tempQueue,))
p.daemon = True
p.start()
# line notify
workdir = '/home/huang/jetson-inference/python/training/detection/ssd'
LINE_ACCESS_TOKEN='NNtLMvjiviTH5dWCnqAjegMICRJY0VTZdrglcCwRpcP' # JetsonNano
url = 'https://notify-api.line.me/api/notify'
# create blank screen for cv2 waitkey() function
thermal_image = np.zeros(shape=[240, 320, 3], dtype=np.uint8)
cv2.imshow("Thermal Image", thermal_image)
# parse the command line
parser = argparse.ArgumentParser(description="Locate objects in a live camera stream using an object detection DNN.",
formatter_class=argparse.RawTextHelpFormatter, epilog=jetson.inference.detectNet.Usage() +
jetson.utils.videoSource.Usage() + jetson.utils.videoOutput.Usage() + jetson.utils.logUsage())
parser.add_argument("input_URI", type=str, default="", nargs='?', help="URI of the input stream")
parser.add_argument("output_URI", type=str, default="", nargs='?', help="URI of the output stream")
parser.add_argument("--network", type=str, default="ssd-mobilenet-v2", help="pre-trained model to load (see below for options)")
parser.add_argument("--overlay", type=str, default="box,labels,conf", help="detection overlay flags (e.g. --overlay=box,labels,conf)\nvalid combinations are: 'box', 'labels', 'conf', 'none'")
parser.add_argument("--threshold", type=float, default=0.5, help="minimum detection threshold to use")
is_headless = ["--headless"] if sys.argv[0].find('console.py') != -1 else [""]
try:
opt = parser.parse_known_args()[0]
except:
print("")
parser.print_help()
sys.exit(0)
# create video output object
output = jetson.utils.videoOutput(opt.output_URI, argv=sys.argv+is_headless)
font = jetson.utils.cudaFont()
# load the object detection network
net = jetson.inference.detectNet(opt.network, sys.argv, opt.threshold)
# create video sources
input = jetson.utils.videoSource(opt.input_URI, argv=sys.argv)
s_key = False
buzz_key = False
# process frames until the user exits
while True:
# capture the next image
img = input.Capture() # img: original frame in cuda memory
detections = net.Detect(img, overlay=opt.overlay)
if not tempQueue.empty():
tframe = tempQueue.get() # tframe: thermal data
tmax = tframe.max()
tmin = tframe.min()
nl_tframe = td_to_img(tframe, tmax, tmin) # nl_tframe: normalized thermal data
# thermal data to color map
c_img = cv2.applyColorMap(nl_tframe, cv2.COLORMAP_JET) # c_img: color mapped thermal image, cv2.COLORMAP_HSV
cs_img = cv2.resize(c_img, (320,240), interpolation = cv2.INTER_CUBIC) # cs_img: small color mapped thermal image
cv2.imshow("Thermal Image", cs_img)
# keyboard input
key = cv2.waitKey(1) & 0xFF
if key == ord("q"): # quit
GPIO.output(pin_buzzer, GPIO.LOW)
break
if key == ord("s"): # simulate alarm
s_key = True # s_key: flage, S key pressed
# convert cudaimg to cv2img
np_img = jetson.utils.cudaToNumpy(img, WinW, WinH, 3) # np_img: numpy array from cuda
cv_img = cv2.cvtColor(np_img.astype (np.uint8), cv2.COLOR_RGBA2BGR) # cv_img: opencv image
# overlay thermal image
cl_img = cv2.resize(c_img, (WinW,WinH), interpolation = cv2.INTER_CUBIC) # cl_img: large color mapped thermal image
ov_img = cv2.addWeighted(cv_img, alpha, cl_img, 1-alpha, 0) # ov_img: cv_img + cl_img
# copy tframe for finding BACKGROUND tmax
tframe_copy = tframe
if (detections is not None) and (tframe is not None):
# check if has person
has_person = 0
for detection in detections:
if labels[detection.ClassID] == 'Person':
has_person += 1
# print("person + 1")
# set alarm temperature and buzz_key
if has_person > 0:
alarm_temp = 200
buzz_key = False
else:
alarm_temp = 150
# go through each detection
for detection in detections:
box = np.array([detection.Left, detection.Top, detection.Right, detection.Bottom])/Ratio
(startX, startY, endX, endY) = box.astype("int")
if startX>0 : startX -= 1
if startY>0 : startY -= 1
if endX <32 : endX += 1
if endY <24 : endY += 1
# modify tframe_copy
for i in range(startY, endY):
for j in range(startX, endX):
tframe_copy[i, j] *= -1
# true_temp = 1.68 * read_temp + -7.46
tmax = tframe[startY:endY, startX:endX].max() * 1.68 - 7.46
text = labels[detection.ClassID] + " Tmax={:.1f} C".format(tmax)
font.OverlayText(img, img.width, img.height, text, int(detection.Left), int(detection.Top), font.White, font.Gray20)
# check if need alarm
if (tmax) >= alarm_temp[detection.ClassID]:
s_key = True
if s_key == True:
cv2.rectangle(cv_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(cv_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
cv2.rectangle(ov_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(ov_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
# check BACKGROUND tmax
tmax_bg = tframe_copy.max() * 1.68 - 7.46
if tmax_bg >= alarm_temp[0]:
s_key = True
# line notify + alarm
if s_key == True:
# start buzzer
buzz_key = True
# save cv_img
fname = 'cv_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, cv_img)
print('Saving image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Object Detected'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
# save ov_img
fname = 'ov_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, ov_img)
print('Saving thermal overlay image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Thermal Overlay Image'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
s_key = False
# buzzer
if buzz_key == True:
GPIO.output(pin_buzzer, GPIO.HIGH)
GPIO.output(pin_buzzer, GPIO.HIGH)
else:
GPIO.output(pin_buzzer, GPIO.LOW)
output.Render(img)
# print the detections
# print("detected {:d} objects in image".format(len(detections)))
# for detection in detections:
# print(detection)
# print out performance info
# net.PrintProfilerTimes()
# exit on input/output EOS
if not input.IsStreaming() or not output.IsStreaming():
break
# update the title bar
output.SetStatus("{:s} | Network {:.0f} FPS".format(opt.network, net.GetNetworkFPS()))
cv2.destroyAllWindows()
GPIO.cleanup()
"""
import Jetson.GPIO as GPIO
import time
pin_buzzer=12
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
while True:
if input() == "s": # start buzzer
GPIO.output(pin_buzzer, GPIO.HIGH)
if input() == "p": # pause
GPIO.output(pin_buzzer, GPIO.LOW)
if input() == "q":
GPIO.output(pin_buzzer, GPIO.LOW)
break
GPIO.cleanup()
"""
#!/usr/bin/python3
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue
import time,board,busio
import adafruit_mlx90640
import numpy as np
import cv2
import datetime as dt
import os
import requests
import jetson.inference
import jetson.utils
import argparse
import sys
import Jetson.GPIO as GPIO
import time
"""
*spaces used throughout code*
Alarm temperature:
no person (init): 150
detects person : 200
Buzzer:
GPIO pin nameing : TEGRA_SOC
pos:pin12 (DAP4_SCLK)
neg:pin14
on = GPIO HIGH
off = GPIO LOW
"""
# labels
labels = ["BACKGROUND", "Person", "Stove", "Microwave", "Oven"]
alarm_temp = [150, 400, 400, 100, 300]
# buzzer
pin_buzzer = "DAP4_SCLK"
# GPIO.setmode(GPIO.TEGRA_SOC)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
# setup I2C & MLX90640
i2c = busio.I2C(board.SCL, board.SDA, frequency=400000) # setup I2C
mlx = adafruit_mlx90640.MLX90640(i2c) # begin MLX90640 with I2C comm
mlx.refresh_rate = adafruit_mlx90640.RefreshRate.REFRESH_8_HZ # 16Hz max
mlx_shape = (24,32)
tdata = np.zeros((24*32,))
alpha = 0.5
WinW = 960
WinH =720
Ratio = 30 # 960/32=30
tframe = np.reshape(np.zeros((WinH*WinW,)), (WinH,WinW))
def td_to_img(f,tmax,tmin):
norm = np.uint8((f - tmin)*255/(tmax-tmin))
return norm
def tframe2Que(outputQueue):
while True:
mlx.getFrame(tdata) # read MLX temperatures into frame var
t_img = (np.reshape(tdata,mlx_shape)) # reshape to 24x32
t_img = np.fliplr(t_img)
outputQueue.put(t_img)
print("[INFO] starting MLX90640 process...")
tempQueue = Queue(maxsize=1)
p = Process(target=tframe2Que, args=(tempQueue,))
p.daemon = True
p.start()
# line notify
workdir = '/home/huang/jetson-inference/python/training/detection/ssd'
LINE_ACCESS_TOKEN='NNtLMvjiviTH5dWCnqAjegMICRJY0VTZdrglcCwRpcP' # JetsonNano
url = 'https://notify-api.line.me/api/notify'
# create blank screen for cv2 waitkey() function
thermal_image = np.zeros(shape=[240, 320, 3], dtype=np.uint8)
cv2.imshow("Thermal Image", thermal_image)
# parse the command line
parser = argparse.ArgumentParser(description="Locate objects in a live camera stream using an object detection DNN.",
formatter_class=argparse.RawTextHelpFormatter, epilog=jetson.inference.detectNet.Usage() +
jetson.utils.videoSource.Usage() + jetson.utils.videoOutput.Usage() + jetson.utils.logUsage())
parser.add_argument("input_URI", type=str, default="", nargs='?', help="URI of the input stream")
parser.add_argument("output_URI", type=str, default="", nargs='?', help="URI of the output stream")
parser.add_argument("--network", type=str, default="ssd-mobilenet-v2", help="pre-trained model to load (see below for options)")
parser.add_argument("--overlay", type=str, default="box,labels,conf", help="detection overlay flags (e.g. --overlay=box,labels,conf)\nvalid combinations are: 'box', 'labels', 'conf', 'none'")
parser.add_argument("--threshold", type=float, default=0.5, help="minimum detection threshold to use")
is_headless = ["--headless"] if sys.argv[0].find('console.py') != -1 else [""]
try:
opt = parser.parse_known_args()[0]
except:
print("")
parser.print_help()
sys.exit(0)
# create video output object
output = jetson.utils.videoOutput(opt.output_URI, argv=sys.argv+is_headless)
font = jetson.utils.cudaFont()
# load the object detection network
net = jetson.inference.detectNet(opt.network, sys.argv, opt.threshold)
# create video sources
input = jetson.utils.videoSource(opt.input_URI, argv=sys.argv)
s_key = False
buzz_key = False
overheat_obj = 'Default'
# process frames until the user exits
while True:
# capture the next image
img = input.Capture() # img: original frame in cuda memory
detections = net.Detect(img, overlay=opt.overlay)
if not tempQueue.empty():
tframe = tempQueue.get() # tframe: thermal data
tmax = tframe.max()
tmin = tframe.min()
nl_tframe = td_to_img(tframe, tmax, tmin) # nl_tframe: normalized thermal data
# thermal data to color map
c_img = cv2.applyColorMap(nl_tframe, cv2.COLORMAP_JET) # c_img: color mapped thermal image, cv2.COLORMAP_HSV
cs_img = cv2.resize(c_img, (320,240), interpolation = cv2.INTER_CUBIC) # cs_img: small color mapped thermal image
cv2.imshow("Thermal Image", cs_img)
# keyboard input
key = cv2.waitKey(1) & 0xFF
if key == ord("q"): # quit
GPIO.output(pin_buzzer, GPIO.LOW)
break
if key == ord("s"): # simulate alarm
s_key = True # s_key: flage, S key pressed
# convert cudaimg to cv2img
np_img = jetson.utils.cudaToNumpy(img, WinW, WinH, 3) # np_img: numpy array from cuda
cv_img = cv2.cvtColor(np_img.astype (np.uint8), cv2.COLOR_RGBA2BGR) # cv_img: opencv image
# overlay thermal image
cl_img = cv2.resize(c_img, (WinW,WinH), interpolation = cv2.INTER_CUBIC) # cl_img: large color mapped thermal image
ov_img = cv2.addWeighted(cv_img, alpha, cl_img, 1-alpha, 0) # ov_img: cv_img + cl_img
# copy tframe for finding BACKGROUND tmax
tframe_copy = tframe
if (detections is not None) and (tframe is not None):
# check if has person
has_person = 0
for detection in detections:
if labels[detection.ClassID] == 'Person':
has_person += 1
# print("person + 1")
# change buzz_key to false if see person
if has_person > 0:
buzz_key = False
# go through each detection
for detection in detections:
box = np.array([detection.Left, detection.Top, detection.Right, detection.Bottom])/Ratio
(startX, startY, endX, endY) = box.astype("int")
if startX>0 : startX -= 1
if startY>0 : startY -= 1
if endX <32 : endX += 1
if endY <24 : endY += 1
# modify tframe_copy
if startX - 10 < 0: sX = 0
else: sX = startX - 10
if startY - 50 < 0: sY = 0
else: sY = startY - 50
if endX + 10 >= 32 : eX = 31
else: eX = endX + 10
if endY + 10 >= 24 : eY = 23
else: eY = endY + 10
for i in range(sY, eY):
for j in range(sX, eX):
tframe_copy[i, j] *= -1
# true_temp = 1.68 * read_temp + -7.46
tmax = tframe[startY:endY, startX:endX].max() * 1.68 - 7.46
text = labels[detection.ClassID] + " Tmax={:.1f} C".format(tmax)
font.OverlayText(img, img.width, img.height, text, int(detection.Left), int(detection.Top), font.White, font.Gray20)
# check if need alarm
if tmax >= alarm_temp[detection.ClassID]:
overheat_obj = text
s_key = True
if s_key == True:
cv2.rectangle(cv_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(cv_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
cv2.rectangle(ov_img, (int(detection.Left), int(detection.Top)), (int(detection.Right), int(detection.Bottom)), (0,0, 255), 2)
cv2.putText(ov_img, text, (int(detection.Left), int(detection.Top)+30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)
# check BACKGROUND tmax
tmax_bg = tframe_copy.max() * 1.68 - 7.46
if tmax_bg >= alarm_temp[0]:
overheat_obj = "BACKGROUND" + " Tmax={:.1f} C".format(tmax_bg)
s_key = True
# line notify + alarm
# bug???
np_img = jetson.utils.cudaToNumpy(img, WinW, WinH, 3) # np_img: numpy array from cuda
cv_img = cv2.cvtColor(np_img.astype (np.uint8), cv2.COLOR_RGBA2BGR) # cv_img: opencv image
cl_img = cv2.resize(c_img, (WinW,WinH), interpolation = cv2.INTER_CUBIC) # cl_img: large color mapped
ov_img = cv2.addWeighted(cv_img, alpha, cl_img, 1-alpha, 0) # ov_img: cv_img + cl_img
if (detections is not None) and (tframe is not None):
has_person = 0
for detection in detections:
if labels[detection.ClassID] == 'Person':
has_person += 1
# print("person + 1")
if has_person > 0: s_key = False
if s_key == True:
# start buzzer
buzz_key = True
# save cv_img
fname = 'cv_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, cv_img)
print('Saving image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Overheating Object Detected : ' + overheat_obj
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
print('sent')
# save ov_img
fname = 'ov_' + dt.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.jpg'
cv2.imwrite(fname, ov_img)
print('Saving thermal overlay image ', fname)
# line message
file = {'imageFile':open(fname,'rb')}
LINE_MSG = 'Thermal Overlay Image'
# send message
data = ({'message':LINE_MSG})
LINE_HEADERS = {'Authorization':'Bearer '+LINE_ACCESS_TOKEN}
session = requests.Session()
r = session.post(url, headers=LINE_HEADERS, files=file, data=data)
print('sent')
s_key = False
overheat_obj = "Default"
# buzzer
if buzz_key == True:
GPIO.output(pin_buzzer, GPIO.HIGH)
GPIO.output(pin_buzzer, GPIO.HIGH)
else:
GPIO.output(pin_buzzer, GPIO.LOW)
output.Render(img)
# print the detections
# print("detected {:d} objects in image".format(len(detections)))
# for detection in detections:
# print(detection)
# print out performance info
# net.PrintProfilerTimes()
# exit on input/output EOS
if not input.IsStreaming() or not output.IsStreaming():
break
# update the title bar
output.SetStatus("{:s} | Network {:.0f} FPS".format(opt.network, net.GetNetworkFPS()))
cv2.destroyAllWindows()
GPIO.cleanup()
"""
import Jetson.GPIO as GPIO
import time
pin_buzzer=12
GPIO.setmode(GPIO.BOARD)
GPIO.setup(pin_buzzer,GPIO.OUT, initial=GPIO.LOW)
while True:
if input() == "s": # start buzzer
GPIO.output(pin_buzzer, GPIO.HIGH)
if input() == "p": # pause
GPIO.output(pin_buzzer, GPIO.LOW)
if input() == "q":
GPIO.output(pin_buzzer, GPIO.LOW)
break
GPIO.cleanup()
"""