# Edge Detection Sobel Filter ```python= #!/usr/bin/env python3 import sys import vpi import numpy as np from PIL import Image from argparse import ArgumentParser #arparse makeas it easy to write user friendly command line interfaces #argparse defines the neede arguments and figures how to parse them with sys.argv # generates help and user messages when invalid arguments are passed #parse command line arguments parser = ArgumentParser() parser.add_argument('backend',choices=['cpu', 'cuda'], help='Backend used for processing') parser.add_argument('input', help='Image to be used for processing') args = parser.parse_args() if args.backend == 'cpu': backend = vpi.Backend.CPU else: backend = vpi.Backend.CUDA #load image to vpi.image try: input = vpi.asimage(np.asarray(Image.open(args.input))) except IOError: sys.exit("Input file not found") except: sys.exit("Error with input file") #convert to gray scale input = input.convert(vpi.Format.U8, backend=vpi.Backend.CUDA) # edge detection kernel kernel = [[ 1, 0, -1], [ 0, 0, 0], [-1, 0, 1]] #using chosen backend with backend: output = input.convolution(kernel, border=vpi.Border.ZERO) #save result to disk Image.fromarray(output.cpu()).save('edges_python'+str(sys.version_info[0])+'_'+args.backend+'.png') ``` ## Preprocessed Image ![kodim01](https://hackmd.io/_uploads/ByQA4tW7ll.png) ## Postprocessed Image ![edges_python3_cuda](https://hackmd.io/_uploads/BkgxBY-Xel.png) ## Disparity computation ```python= #!/usr/bin/python3 import sys #import vpi import numpy as np from PIL import Image from argparse import ArgumentParser import cv2 #function to read and convert grayscale image to PIL image object def read_file(fpath, resize_to=None, verbose=False): try: if verbose: print(f"Image reading: f{fpath}", end=" ", flush=True) # read file read mode and binary my_file = open(fpath, 'rb') #construct array from data, -1 all data to be read np_arr = np.fromfile(my_file, dtype=np.uint16, count=-1) my_file.close() if verbose: print(f"Done reading img shape{np_arr.shape} image type {np_arr.dtype}") if resize_to is not None: np_arr = np_arr.reshape(resize_to, order='C') if verbose: print(f"Image reshaped to {np_arr.shape} dtype {np_arr.dtype}") #convert to PIL image object pil_img = Image.fromarray(np_arr, mode="I;16L") except: raise ValueError(f"Cannot process raw input") # # process arguments cli def process_arguments(): parser = ArgumentParser() parser.add_argument('backend', choices=['cuda', 'cpu'], help='Backend for processing') parser.add_argument('left', help='Rectified left input from stereo pair') parser.add_argument('right', help='Rectified right input from stereo pair') parser.add_argument('--width', default=-1, type=int, help='Input width raw files') parser.add_argument('--height', default=-1, type=int, help='Input height for raw input files') parser.add_argument('--downscale', default=1, type=int, help='Output downscale factor') parser.add_argument('--window_size', default=5, type=int, help='Median filter window size') parser.add_argument('--skip_confidence', default=False, action='store_true', help='Skip confidence map computation') parser.add_argument('--conf_threshold', default=32767, type=int, help='Confidence Threshold') parser.add_argument('--conf_type', default='best', choices=['best', 'absolute', 'relative', 'inference'], help='Computation to produce confidence output') parser.add_argument('-p1', default=3, type=int, help='P1 penalize small disparities') parser.add_argument('-p2', default=48, type=int, help='P2 for large disparities') parser.add_argument('--p2 alpha', default=0, type=int, help='adaptive penalty for p2 penalty') parser.add_argument('--uniqueness', default=-1, type=float, help='Uniqueness ratio') parser.add_argument('--skip_diagonal', default=False, action='store_true', help='Do not use diagonal path') parser.add_argument('--num_passes', default=3, type=int, help='number of passes') parser.add_argument('--min_disparity', default=0, type=int, help='Min disparity') parser.add_argument('--max_disparity', default=256, type=int, help='Maximum disparity') parser.add_argument('--output_mode', default=0, type=int, help='0: color, 1:grayscale, 2: raw binary') parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Verbose mode') return parser.parse_args() #main function def main(): args = process_arguments() scale = 1 if args.backend == 'cuda': backend = vpi.Backend.CUDA elif args.backend == 'cpu': backend = vpi.Backend.CPU else: raise ValueError(f"Invalid backend{args.backend}") conftype = None if args.conf_type == 'best': conftype = vpi.ConfidenceType.ABSOLUTE elif args.conf_type == 'absolute': conftype = vpi.ConfidenceType.ABSOLUTE elif args.conf_type == 'relative': conftype = vpi.ConfidenceType.RELATIVE elif args.conf_type == 'inference': conftype = vpi.ConfidenceType.INFERENCE else: raise ValueError(f"Invalid confidence type {args.conf_type}") minDisparity = args.min_disparity maxDisparity = args.max_disparity includeDiagonals = not args.skip_diagonal numPasses = args.num_passes calcConf = not args.skip_confidence downscale = args.downscale windowSize = args.window_size quality = 6 if args.verbose: print(f'I Backend: {backend}\nI Left image: {args.left}\nI Right image: {args.right}\n' f'I Disparities (min, max): {(minDisparity, maxDisparity)}\n' f'I Input scale factor: {scale}\nI Output downscale factor: {downscale}\n' f'I Window size: {windowSize}\nI Quality: {quality}\n' f'I Calculate confidence: {calcConf}\nI Confidence threshold: {args.conf_threshold}\n' f'I Confidence type: {conftype}\nI Uniqueness ratio: {args.uniqueness}\n' f'I Penalty P1: {args.p1}\nI Penalty P2: {args.p2}\nI Adaptive P2 alpha: {args.p2_alpha}\n' f'I Include diagonals: {includeDiagonals}\nI Number of passes: {numPasses}\n' f'I Output mode: {args.output_mode}\nI Verbose: {args.verbose}\n' , end='', flush=True) #check if image is raw (contains raw ) if 'raw' in args.left: #convert image to pillow object pil_left = read_file(args.left, resize_to=[args.height, args.width]) np_left = np.asarray(pil_left) else: try: #open image pil_left = Image.open(args.left) if pil_left.mode == 'I': np_left = np.asarray(pil_left).astype(np.int16) else: np_left = np.asarray(pil_left) except: raise ValueError(f"Cannot open left image {args.left}") if 'raw' in args.right: #convert image to pillow object pil_right = read_file(args.right, resize_to=[args.height, args.width]) np_ri = np.asarray(pil_right) else: try: #open image pil_right = Image.open(args.right) if pil_left.mode == 'I': np_right = np.asarray(pil_right).astype(np.int16) else: np_right = np.asarray(pil_right) except: raise ValueError(f"Cannot open left image {args.right}") #independent left and right preprocessing streams streamLeft = vpi.Stream() streamRight = vpi.Stream() #load image to vpi and convert to grayscale with vpi.Backend.CUDA: with streamLeft: left = vpi.asimage(np_left).convert(vpi.Format.Y16_ER, scale=scale) with streamRight: right = vpi.asimage(np_right).convert(vpi.Format.Y16_ER, scale=scale ) if args.verbose: print(f'I Input left image: {left.size} {left.format}\n' f'I Input right image: {right.size} {right.format}', flush=True) confidenceU16 = None if calcConf: if args.backend != 'cuda': calcConf = False if args.verbose: print(f'{args.backend} does not allow confidence computation', flush=True) outWidth = (left.size[0] + downscale - 1) //downscale outHeight = (left.size[1] + downscale - 1) //downscale if calcConf: confidenceU16 = vpi.Image((outWidth, outHeight), vpi.Format.U16) # use stream left streamStereo = streamLeft #disparity estimation with streamStereo, backend: disparityS16 = vpi.stereodisp(left, right, downscale=downscale, out_confmap=confidenceU16, window=windowSize, maxdisp=maxDisparity, confthreshold = args.conf_threshold, quality=quality, conftype=conftype, mindisp=minDisparity, p1=args.p1, p2=args.p2, p2alpha=args.args.p2_alpha, uniquness=args.uniqueness, includediagonals = includeDiagonals, numpasses=numPasses) if args.verbose: print('done\ post processing ', end=' ', flush=True) #store to disk with streamStereo, vpi.Backend.CUDA: if disparityS16.format == vpi.Format.S16_BL: disparityS16 = disparityS16.convert(vpi.Format.S16, backend=vpi.Backend.VIC) #normalize image disparityU8 = disparityS16.convert(vpi.Format.U8, scale = 255.0/(32 * maxDisparity)).cpu() #apply color map disparityColor = cv2.cvtColor(disparityU8, disparityColor) #convert to rgb disparityColor = cv2.cvtColor(disparityColor, cv2.COLOR_BGR2RGB) #compute confudence map if calcConf: confidenceU8 = confidenceU16.convert(vpi.Format.U8, scale=255.0/65535).cpu() # pixel confidence is 0 disparity black mask = cv2.threshold(confidenceU8, 1, 255, cv2.THRESH_BINARY) mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) disparityColor = cv2.bitwise_and(disparityColor, mask) tend = '.raw' if args.output_mode == 2 else '.png' disparity_fname = f"disparity_python" + tend confidence_fname = f"confidence_python "+ tend # save results to disk try: if args.output_mode == 0: Image.fromarray(disparityColor).save(disparity_fname) elif args.output_mode == 1: Image.fromarray(disparityU8).save(disparity_fname) elif args.output_mode == 2: disparityS16.cpu().tofile(disparity_fname) except: raise ValueError(f' Error encountered') if __name__ == '__main__': main() ``` ## Stereo Lidar Early Fusion ```python #!/usr/bin/env python3 import numpy as np import cv2 import matplotlib.pyplot as plt import statistics import random ''' Fusion Levels: Can be grouped as early fusion, mid level fusion and late fusion ''' ''' Raw sensor is combined before processing in early fusion In this case lidar point clouds are projected on the camera image plane using intrinsic and extrinsic calibration to create single dataset ''' class LiDAR_CAM(object): #constructor def __init__(self, calib_file): #inclass methods calibs = self.read_calib_files(calib_file) P = calibs["P2"] self.P = np.reshape(P, [3,4]) # transformation from velod tp camer coords V2C = calibs["Tr_velo_to_cam"] self.V2C = np.reshape(V2C, [3,4]) # extrinsic camer matrix R0 = calibs["R0_rect"] self.R0 = np.reshape(R0, [3,3]) ''' Read calibration files''' def read_calib_files(self, fpath): ' read calib files and parse to a dictionary' data = {} with open(fpath, "r") as f: for line in f.readlines(): line = line.rstrip() #check length of line if len(line) == 0: continue key, value = line.split(":", 1) try: data[key] = np.array([float(a) for a in value.split()]) except ValueError: print("Error encountered !! ") return data ''' Transform from cartesian homogeneous coords Homogeneous coordinates are used in camera matrices to represent and perform perspective projections and transformations, including translations, rotations, and scaling, as linear operations within a 4D space ''' def cartesian_homogeneous(self, pts_3D): n = pts_3D.shape[0] pts_3d_homog = np.hstack((pts_3D, np.ones(n,1))) return pts_3d_homog ''' Project from velodyne lidar to camera frame ''' def project_velo_to_image(self, pts_3d_velo): R0_homo = np.stack([self.R0, [0, 0, 0]]) # homogeneous coordinate R0_homo_1 = np.hstack([R0_homo, [0, 0, 0, 1]]) # form projection matrix Proj = np.dot(self.P, R0_homo_1) #complete projection matrix p_ro_rt = np.dot(Proj, np.vstack(self.V2C, [0,0,0, 1])) #convert lidar to homogeneous coordinate pts_3d_homo = np.column_stack([pts_3d_velo, np.ones((pts_3d_velo[0], 1))]) #perform the projection p_ro_rt_x = np.dot(p_ro_rt, np.transpose(pts_3d_homo)) pts_2d = np.transpose(p_ro_rt_x) #convert homogeneous image coordinates to 2D pixel coordinates pts_2d[:, 0] /= pts_2d[:,2] pts_2d[:, 1] /= pts_2d[:,2] return pts_2d[:, 0:2] ''' Perform filtering of lidar points to ensure only valid regions are retained''' def get_lidar_in_image_fov(self, pc_velo, xmin, ymin, xmax, ymax, return_more=False, clip_distance=2.0): ''' Perform the 3d mapping and ensure lidar points to be in FOV ''' pts_2d = self.project_velo_to_image(pc_velo) #boolean logic for valid image regions fov_indices = ( (pts_2d[:, 0] < xmax ) & (pts_2d[:, 0] >= xmin) & (pts_2d[:, 1] >= ymax) & (pts_2d[:, 1] >= ymin) ) fov_inds = fov_indices & (pc_velo[:,0] > clip_distance) imgfov_pc_velo = pc_velo[fov_inds, :] if return_more: return imgfov_pc_velo, pts_2d, fov_inds else: return imgfov_pc_velo ''' Project filtered lidar points to image ''' def show_lidar_on_image(self, pc_velo, img): # start with filtering imgfov_pc_velo, pts_2d, fov_inds = self.get_lidar_in_image_fov(pc_velo, 0, 0, img.shape[1], img.shape[0], return_more=True, clip_distance=2.0) self.imgfov_pc_velo = imgfov_pc_velo self.imgfov_pts_2d = pts_2d[fov_inds] cmap = np.array([plt.cm.hsv(i)[:3] for i in range(256)]) * 255 for point, depth in zip(self.imgfov_pc_velo, self.imgfov_pts_2d[:,0]): color = cmap[int(min(255, 510.0/max(depth, 0.1)))] cv2.circle(img, (int(round(point[0])), int(round(point[1]))), 2, color=tuple(color), thickness=-1) return img ''' Check if a point lies within the bounding box''' def rectContains(self, rect, pt, w, h, shrink_factor=0): x1 = int(rect[0] * w - rect[2] * w * 0.5 * (1 - shrink_factor)) # center_x - width /2 * shrink_factor y1 = int(rect[1] * h - rect[3] * h * 0.5 * (1 - shrink_factor)) # center_y - height /2 * shrink_factor x2 = int(rect[0] * w + rect[2] * w * 0.5 * (1 - shrink_factor)) # center_x + width/2 * shrink_factor y2 = int(rect[1] * h + rect[3] * h * 0.5 * (1 - shrink_factor)) return x1 < pt[0] < x2 and y1 < pt[1] < y2 ''' shrinking eliminates poinst barely in the box''' ''' Remove the outliers in dataset by computing the mean and comparing standard deviation''' def filter_outliers(self, distances): #list to store valid points inliers = [] my_mean = statistics.mean(distances) my_std = statistics.stdev(distances) for x in distances: if abs( x - my_mean) < my_std: inliers.append(x) return inliers ''' Compute best metric distances The lidar function gives multiple 3d points per object and we wish to know which distance is optimal to use for tracking and displaying ''' def get_best_distance(self, distances, technique="closest"): if technique == "closest": return min(distances) elif technique == "average": return statistics.mean(distances) elif technique == "random": return random.choice(distances) else: return statistics.median(sorted(distances)) ''' Lidar to camera fusion ''' def lidar_camera_fusion(self, pred_bboxes, image): # make image copy preserve the original img_cp = image.copy() # color map cmap = np.array([plt.cm.hsv(i)[:3] for i in range(256)]) * 255 for box in pred_bboxes: distances = [] # loop over all the lidar points for pts_2d, depth in zip(self.imgfov_pts_2d, self.imgfov_pc_velo[:,0]): if self.rectContains(box, pts_2d, image.shape[1], image.shape[0], shrink_factor=0.2): distances.append(depth) #assign color corresponding to depth color_idx = int(min(255, 510.0/max(depth, 0.1))) color = tuple(cmap[color_idx].astype(int)) cv2.circle(img_cp, (int(round(pts_2d[0])), int(round(pts_2d[1]))), 2, color, thickness=-1) #annotate distance if enough lidar points are inside box if len(distances) > 2: distances = self.filter_outliers(distances) best_distances = self.get_best_distance(distances, technique="average") x,y = int(box[0] * image.shape[1], int(box[1] * image.shape[0])) cv2.putText(img_cp, f"{best_distances : .2f} m", (x, y), cv2.FONT_HERSHEY_PLAIN, 0.75, (255, 0, 0), 2) return img_cp, distances ```