# Edge Detection Sobel Filter
```python=
#!/usr/bin/env python3
import sys
import vpi
import numpy as np
from PIL import Image
from argparse import ArgumentParser
#arparse makeas it easy to write user friendly command line interfaces
#argparse defines the neede arguments and figures how to parse them with sys.argv
# generates help and user messages when invalid arguments are passed
#parse command line arguments
parser = ArgumentParser()
parser.add_argument('backend',choices=['cpu', 'cuda'],
help='Backend used for processing')
parser.add_argument('input', help='Image to be used for processing')
args = parser.parse_args()
if args.backend == 'cpu':
backend = vpi.Backend.CPU
else:
backend = vpi.Backend.CUDA
#load image to vpi.image
try:
input = vpi.asimage(np.asarray(Image.open(args.input)))
except IOError:
sys.exit("Input file not found")
except:
sys.exit("Error with input file")
#convert to gray scale
input = input.convert(vpi.Format.U8, backend=vpi.Backend.CUDA)
# edge detection kernel
kernel = [[ 1, 0, -1],
[ 0, 0, 0],
[-1, 0, 1]]
#using chosen backend
with backend:
output = input.convolution(kernel, border=vpi.Border.ZERO)
#save result to disk
Image.fromarray(output.cpu()).save('edges_python'+str(sys.version_info[0])+'_'+args.backend+'.png')
```
## Preprocessed Image

## Postprocessed Image

## Disparity computation
```python=
#!/usr/bin/python3
import sys
#import vpi
import numpy as np
from PIL import Image
from argparse import ArgumentParser
import cv2
#function to read and convert grayscale image to PIL image object
def read_file(fpath, resize_to=None, verbose=False):
try:
if verbose:
print(f"Image reading: f{fpath}", end=" ", flush=True)
# read file read mode and binary
my_file = open(fpath, 'rb')
#construct array from data, -1 all data to be read
np_arr = np.fromfile(my_file, dtype=np.uint16, count=-1)
my_file.close()
if verbose:
print(f"Done reading img shape{np_arr.shape} image type {np_arr.dtype}")
if resize_to is not None:
np_arr = np_arr.reshape(resize_to, order='C')
if verbose:
print(f"Image reshaped to {np_arr.shape} dtype {np_arr.dtype}")
#convert to PIL image object
pil_img = Image.fromarray(np_arr, mode="I;16L")
except:
raise ValueError(f"Cannot process raw input")
#
# process arguments cli
def process_arguments():
parser = ArgumentParser()
parser.add_argument('backend', choices=['cuda', 'cpu'],
help='Backend for processing')
parser.add_argument('left', help='Rectified left input from stereo pair')
parser.add_argument('right', help='Rectified right input from stereo pair')
parser.add_argument('--width', default=-1, type=int, help='Input width raw files')
parser.add_argument('--height', default=-1, type=int, help='Input height for raw input files')
parser.add_argument('--downscale', default=1, type=int, help='Output downscale factor')
parser.add_argument('--window_size', default=5, type=int, help='Median filter window size')
parser.add_argument('--skip_confidence', default=False, action='store_true', help='Skip confidence map computation')
parser.add_argument('--conf_threshold', default=32767, type=int, help='Confidence Threshold')
parser.add_argument('--conf_type', default='best', choices=['best', 'absolute', 'relative', 'inference'], help='Computation to produce confidence output')
parser.add_argument('-p1', default=3, type=int, help='P1 penalize small disparities')
parser.add_argument('-p2', default=48, type=int, help='P2 for large disparities')
parser.add_argument('--p2 alpha', default=0, type=int, help='adaptive penalty for p2 penalty')
parser.add_argument('--uniqueness', default=-1, type=float, help='Uniqueness ratio')
parser.add_argument('--skip_diagonal', default=False, action='store_true', help='Do not use diagonal path')
parser.add_argument('--num_passes', default=3, type=int, help='number of passes')
parser.add_argument('--min_disparity', default=0, type=int, help='Min disparity')
parser.add_argument('--max_disparity', default=256, type=int, help='Maximum disparity')
parser.add_argument('--output_mode', default=0, type=int, help='0: color, 1:grayscale, 2: raw binary')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Verbose mode')
return parser.parse_args()
#main function
def main():
args = process_arguments()
scale = 1
if args.backend == 'cuda':
backend = vpi.Backend.CUDA
elif args.backend == 'cpu':
backend = vpi.Backend.CPU
else:
raise ValueError(f"Invalid backend{args.backend}")
conftype = None
if args.conf_type == 'best':
conftype = vpi.ConfidenceType.ABSOLUTE
elif args.conf_type == 'absolute':
conftype = vpi.ConfidenceType.ABSOLUTE
elif args.conf_type == 'relative':
conftype = vpi.ConfidenceType.RELATIVE
elif args.conf_type == 'inference':
conftype = vpi.ConfidenceType.INFERENCE
else:
raise ValueError(f"Invalid confidence type {args.conf_type}")
minDisparity = args.min_disparity
maxDisparity = args.max_disparity
includeDiagonals = not args.skip_diagonal
numPasses = args.num_passes
calcConf = not args.skip_confidence
downscale = args.downscale
windowSize = args.window_size
quality = 6
if args.verbose:
print(f'I Backend: {backend}\nI Left image: {args.left}\nI Right image: {args.right}\n'
f'I Disparities (min, max): {(minDisparity, maxDisparity)}\n'
f'I Input scale factor: {scale}\nI Output downscale factor: {downscale}\n'
f'I Window size: {windowSize}\nI Quality: {quality}\n'
f'I Calculate confidence: {calcConf}\nI Confidence threshold: {args.conf_threshold}\n'
f'I Confidence type: {conftype}\nI Uniqueness ratio: {args.uniqueness}\n'
f'I Penalty P1: {args.p1}\nI Penalty P2: {args.p2}\nI Adaptive P2 alpha: {args.p2_alpha}\n'
f'I Include diagonals: {includeDiagonals}\nI Number of passes: {numPasses}\n'
f'I Output mode: {args.output_mode}\nI Verbose: {args.verbose}\n'
, end='', flush=True)
#check if image is raw (contains raw )
if 'raw' in args.left:
#convert image to pillow object
pil_left = read_file(args.left, resize_to=[args.height, args.width])
np_left = np.asarray(pil_left)
else:
try:
#open image
pil_left = Image.open(args.left)
if pil_left.mode == 'I':
np_left = np.asarray(pil_left).astype(np.int16)
else:
np_left = np.asarray(pil_left)
except:
raise ValueError(f"Cannot open left image {args.left}")
if 'raw' in args.right:
#convert image to pillow object
pil_right = read_file(args.right, resize_to=[args.height, args.width])
np_ri = np.asarray(pil_right)
else:
try:
#open image
pil_right = Image.open(args.right)
if pil_left.mode == 'I':
np_right = np.asarray(pil_right).astype(np.int16)
else:
np_right = np.asarray(pil_right)
except:
raise ValueError(f"Cannot open left image {args.right}")
#independent left and right preprocessing streams
streamLeft = vpi.Stream()
streamRight = vpi.Stream()
#load image to vpi and convert to grayscale
with vpi.Backend.CUDA:
with streamLeft:
left = vpi.asimage(np_left).convert(vpi.Format.Y16_ER, scale=scale)
with streamRight:
right = vpi.asimage(np_right).convert(vpi.Format.Y16_ER, scale=scale )
if args.verbose:
print(f'I Input left image: {left.size} {left.format}\n'
f'I Input right image: {right.size} {right.format}', flush=True)
confidenceU16 = None
if calcConf:
if args.backend != 'cuda':
calcConf = False
if args.verbose:
print(f'{args.backend} does not allow confidence computation', flush=True)
outWidth = (left.size[0] + downscale - 1) //downscale
outHeight = (left.size[1] + downscale - 1) //downscale
if calcConf:
confidenceU16 = vpi.Image((outWidth, outHeight), vpi.Format.U16)
# use stream left
streamStereo = streamLeft
#disparity estimation
with streamStereo, backend:
disparityS16 = vpi.stereodisp(left, right, downscale=downscale, out_confmap=confidenceU16,
window=windowSize, maxdisp=maxDisparity, confthreshold = args.conf_threshold,
quality=quality, conftype=conftype, mindisp=minDisparity,
p1=args.p1, p2=args.p2, p2alpha=args.args.p2_alpha,
uniquness=args.uniqueness, includediagonals = includeDiagonals, numpasses=numPasses)
if args.verbose:
print('done\ post processing ', end=' ', flush=True)
#store to disk
with streamStereo, vpi.Backend.CUDA:
if disparityS16.format == vpi.Format.S16_BL:
disparityS16 = disparityS16.convert(vpi.Format.S16, backend=vpi.Backend.VIC)
#normalize image
disparityU8 = disparityS16.convert(vpi.Format.U8, scale = 255.0/(32 * maxDisparity)).cpu()
#apply color map
disparityColor = cv2.cvtColor(disparityU8, disparityColor)
#convert to rgb
disparityColor = cv2.cvtColor(disparityColor, cv2.COLOR_BGR2RGB)
#compute confudence map
if calcConf:
confidenceU8 = confidenceU16.convert(vpi.Format.U8, scale=255.0/65535).cpu()
# pixel confidence is 0 disparity black
mask = cv2.threshold(confidenceU8, 1, 255, cv2.THRESH_BINARY)
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
disparityColor = cv2.bitwise_and(disparityColor, mask)
tend = '.raw' if args.output_mode == 2 else '.png'
disparity_fname = f"disparity_python" + tend
confidence_fname = f"confidence_python "+ tend
# save results to disk
try:
if args.output_mode == 0:
Image.fromarray(disparityColor).save(disparity_fname)
elif args.output_mode == 1:
Image.fromarray(disparityU8).save(disparity_fname)
elif args.output_mode == 2:
disparityS16.cpu().tofile(disparity_fname)
except:
raise ValueError(f' Error encountered')
if __name__ == '__main__':
main()
```
## Stereo Lidar Early Fusion
```python
#!/usr/bin/env python3
import numpy as np
import cv2
import matplotlib.pyplot as plt
import statistics
import random
''' Fusion Levels: Can be grouped as early fusion, mid level fusion and late fusion '''
''' Raw sensor is combined before processing in early fusion
In this case lidar point clouds are projected on the camera image plane using intrinsic and extrinsic calibration to
create single dataset
'''
class LiDAR_CAM(object):
#constructor
def __init__(self, calib_file):
#inclass methods
calibs = self.read_calib_files(calib_file)
P = calibs["P2"]
self.P = np.reshape(P, [3,4])
# transformation from velod tp camer coords
V2C = calibs["Tr_velo_to_cam"]
self.V2C = np.reshape(V2C, [3,4])
# extrinsic camer matrix
R0 = calibs["R0_rect"]
self.R0 = np.reshape(R0, [3,3])
''' Read calibration files'''
def read_calib_files(self, fpath):
' read calib files and parse to a dictionary'
data = {}
with open(fpath, "r") as f:
for line in f.readlines():
line = line.rstrip()
#check length of line
if len(line) == 0:
continue
key, value = line.split(":", 1)
try:
data[key] = np.array([float(a) for a in value.split()])
except ValueError:
print("Error encountered !! ")
return data
''' Transform from cartesian homogeneous coords
Homogeneous coordinates are used in camera matrices to represent
and perform perspective projections and transformations,
including translations, rotations, and scaling,
as linear operations within a 4D space
'''
def cartesian_homogeneous(self, pts_3D):
n = pts_3D.shape[0]
pts_3d_homog = np.hstack((pts_3D, np.ones(n,1)))
return pts_3d_homog
''' Project from velodyne lidar to camera frame '''
def project_velo_to_image(self, pts_3d_velo):
R0_homo = np.stack([self.R0, [0, 0, 0]])
# homogeneous coordinate
R0_homo_1 = np.hstack([R0_homo, [0, 0, 0, 1]])
# form projection matrix
Proj = np.dot(self.P, R0_homo_1)
#complete projection matrix
p_ro_rt = np.dot(Proj, np.vstack(self.V2C, [0,0,0, 1]))
#convert lidar to homogeneous coordinate
pts_3d_homo = np.column_stack([pts_3d_velo, np.ones((pts_3d_velo[0], 1))])
#perform the projection
p_ro_rt_x = np.dot(p_ro_rt, np.transpose(pts_3d_homo))
pts_2d = np.transpose(p_ro_rt_x)
#convert homogeneous image coordinates to 2D pixel coordinates
pts_2d[:, 0] /= pts_2d[:,2]
pts_2d[:, 1] /= pts_2d[:,2]
return pts_2d[:, 0:2]
''' Perform filtering of lidar points to ensure only valid regions are retained'''
def get_lidar_in_image_fov(self, pc_velo, xmin, ymin, xmax, ymax, return_more=False, clip_distance=2.0):
''' Perform the 3d mapping and ensure lidar points to be in FOV '''
pts_2d = self.project_velo_to_image(pc_velo)
#boolean logic for valid image regions
fov_indices = (
(pts_2d[:, 0] < xmax ) & (pts_2d[:, 0] >= xmin) &
(pts_2d[:, 1] >= ymax) & (pts_2d[:, 1] >= ymin)
)
fov_inds = fov_indices & (pc_velo[:,0] > clip_distance)
imgfov_pc_velo = pc_velo[fov_inds, :]
if return_more:
return imgfov_pc_velo, pts_2d, fov_inds
else:
return imgfov_pc_velo
''' Project filtered lidar points to image '''
def show_lidar_on_image(self, pc_velo, img):
# start with filtering
imgfov_pc_velo, pts_2d, fov_inds = self.get_lidar_in_image_fov(pc_velo, 0, 0, img.shape[1],
img.shape[0], return_more=True,
clip_distance=2.0)
self.imgfov_pc_velo = imgfov_pc_velo
self.imgfov_pts_2d = pts_2d[fov_inds]
cmap = np.array([plt.cm.hsv(i)[:3] for i in range(256)]) * 255
for point, depth in zip(self.imgfov_pc_velo, self.imgfov_pts_2d[:,0]):
color = cmap[int(min(255, 510.0/max(depth, 0.1)))]
cv2.circle(img, (int(round(point[0])), int(round(point[1]))), 2, color=tuple(color), thickness=-1)
return img
''' Check if a point lies within the bounding box'''
def rectContains(self, rect, pt, w, h, shrink_factor=0):
x1 = int(rect[0] * w - rect[2] * w * 0.5 * (1 - shrink_factor)) # center_x - width /2 * shrink_factor
y1 = int(rect[1] * h - rect[3] * h * 0.5 * (1 - shrink_factor)) # center_y - height /2 * shrink_factor
x2 = int(rect[0] * w + rect[2] * w * 0.5 * (1 - shrink_factor)) # center_x + width/2 * shrink_factor
y2 = int(rect[1] * h + rect[3] * h * 0.5 * (1 - shrink_factor))
return x1 < pt[0] < x2 and y1 < pt[1] < y2
''' shrinking eliminates poinst barely in the box'''
''' Remove the outliers in dataset by computing the mean and comparing standard deviation'''
def filter_outliers(self, distances):
#list to store valid points
inliers = []
my_mean = statistics.mean(distances)
my_std = statistics.stdev(distances)
for x in distances:
if abs( x - my_mean) < my_std:
inliers.append(x)
return inliers
''' Compute best metric distances
The lidar function gives multiple 3d points per object and we wish to know which
distance is optimal to use for tracking and displaying
'''
def get_best_distance(self, distances, technique="closest"):
if technique == "closest":
return min(distances)
elif technique == "average":
return statistics.mean(distances)
elif technique == "random":
return random.choice(distances)
else:
return statistics.median(sorted(distances))
''' Lidar to camera fusion '''
def lidar_camera_fusion(self, pred_bboxes, image):
# make image copy preserve the original
img_cp = image.copy()
# color map
cmap = np.array([plt.cm.hsv(i)[:3] for i in range(256)]) * 255
for box in pred_bboxes:
distances = []
# loop over all the lidar points
for pts_2d, depth in zip(self.imgfov_pts_2d, self.imgfov_pc_velo[:,0]):
if self.rectContains(box, pts_2d, image.shape[1], image.shape[0], shrink_factor=0.2):
distances.append(depth)
#assign color corresponding to depth
color_idx = int(min(255, 510.0/max(depth, 0.1)))
color = tuple(cmap[color_idx].astype(int))
cv2.circle(img_cp, (int(round(pts_2d[0])), int(round(pts_2d[1]))), 2, color, thickness=-1)
#annotate distance if enough lidar points are inside box
if len(distances) > 2:
distances = self.filter_outliers(distances)
best_distances = self.get_best_distance(distances, technique="average")
x,y = int(box[0] * image.shape[1], int(box[1] * image.shape[0]))
cv2.putText(img_cp, f"{best_distances : .2f} m", (x, y), cv2.FONT_HERSHEY_PLAIN, 0.75, (255, 0, 0), 2)
return img_cp, distances
```