import copy import time import cv2 import numpy as np from pathlib import Path import glob import re import os #from screen_grab import grab class LoadImages: # for inference def __init__(self, path, screen=1): self.dev = False self.grab_screen = False p = str(Path(path).absolute()) # os-agnostic absolute path print(p) if '*' in p: files = sorted(glob.glob(p, recursive=True)) # glob elif os.path.isdir(p): files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir elif os.path.isfile(p): files = [p] # files elif p.startswith("/dev/video"): files = [p] self.dev = True # elif "screengrab" in p: # files = [p] # self.grab_screen = True else: raise Exception(f'ERROR: {p} does not exist') img_formats = ['bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo'] # acceptable image suffixes vid_formats = ['mov', 'avi', 'mp4', 'mpg', 'mpeg', 'm4v', 'wmv', 'mkv'] # acceptable video suffixes images = [x for x in files if x.split('.')[-1].lower() in img_formats] videos = [x for x in files if x.split('.')[-1].lower() in vid_formats] if self.dev: videos = [p] ni, nv = len(images), len(videos) self.files = images + videos self.screen = screen self.nf = ni + nv # number of files self.video_flag = [False] * ni + [True] * nv self.mode = 'image' # if self.grab_screen: # self.nf = 1 # self.video_flag = [False] # self.files = [p] if any(videos): self.new_video(videos[0]) # new video else: self.cap = None assert self.nf > 0, f'No images or videos found in {p}. ' \ f'Supported formats are:\nimages: {img_formats}\nvideos: {vid_formats}' def __iter__(self): self.count = 0 return self def __next__(self): if self.count == self.nf: raise StopIteration path = self.files[self.count] if self.video_flag[self.count]: # Read video self.mode = 'video' ret_val, img0 = self.cap.read() if not ret_val: self.count += 1 self.cap.release() if self.count == self.nf: # last video raise StopIteration else: path = self.files[self.count] self.new_video(path) ret_val, img0 = self.cap.read() # if self.dev: # if (cv2.waitKey(1) & 0xFF) == ord('q'): # raise StopIteration self.frame += 1 print(f'video {self.count + 1}/{self.nf} ({self.frame}/{self.nframes}) {path}: ', end='') # elif self.grab_screen: # self.mode = 'video' # img0 = grab(self.screen) # assert img0 is not None, 'Frame Error' # if (cv2.waitKey(1) & 0xFF) == ord('q'): # raise StopIteration else: # Read image self.count += 1 img0 = cv2.imread(path) # BGR assert img0 is not None, 'Image Not Found ' + path #print(f'image {self.count}/{self.nf} {path}: ', end='') # Padded resize img0 = cv2.resize(img0, (1280,720), interpolation=cv2.INTER_LINEAR) return path, img0, self.cap def new_video(self, path): self.frame = 0 self.cap = cv2.VideoCapture(path) self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) if self.dev: self.nframes = 250 def __len__(self): return self.nf # number of files def increment_path(path, exist_ok=True, sep=''): # Increment path, i.e. runs/exp --> runs/exp{sep}0, runs/exp{sep}1 etc. path = Path(path) # os-agnostic if (path.exists() and exist_ok) or (not path.exists()): return str(path) else: dirs = glob.glob(f"{path}{sep}*") # similar paths matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs] i = [int(m.groups()[0]) for m in matches if m] # indices n = max(i) + 1 if i else 2 # increment number return f"{path}{sep}{n}" # update path def letterbox( img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32, ): # Resize and pad image while meeting stride-multiple constraints shape = img.shape[:2] # current shape [height, width] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) # Scale ratio (new / old) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) if not scaleup: # only scale down, do not scale up (for better test mAP) r = min(r, 1.0) # Compute padding ratio = r, r # width, height ratios new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[ 1] # wh padding if auto: # minimum rectangle dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding elif scaleFill: # stretch dw, dh = 0.0, 0.0 new_unpad = (new_shape[1], new_shape[0]) ratio = new_shape[1] / shape[1], new_shape[0] / shape[ 0] # width, height ratios # divide padding into 2 sides dw /= 2 dh /= 2 if shape[::-1] != new_unpad: # resize img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder( img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color, ) # add border return img, ratio, (dw, dh) def _make_grid(nx=20, ny=20): xv, yv = np.meshgrid(np.arange(0, nx), np.arange(0, ny)) return np.stack((xv, yv), 2).reshape((1, 1, ny, nx, 2)).astype('float32') def _sigmoid(arr): arr = np.array(arr, dtype=np.float32) return 1.0 / (1.0 + np.exp(-1.0 * arr)) def split_for_trace_model(pred=None, anchor_grid=None): z = [] st = [8, 16, 32] for i in range(3): bs, _, ny, nx = pred[i].shape pred[i] = pred[i].reshape(bs, 3, 85, ny, nx).transpose(0, 1, 3, 4, 2) y = _sigmoid(pred[i]) gr = _make_grid(nx, ny) y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + gr) * st[i] # xy y[..., 2:4] = (y[..., 2:4] * 2)**2 * anchor_grid[i] # wh z.append(y.reshape(bs, -1, 85)) pred = np.concatenate(z, 1) return pred def _xywh2xyxy(x): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] # where xy1=top-left, xy2=bottom-right y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def _box_iou(box1, box2): # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py """ Return intersection-over-union (Jaccard index) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format. Arguments: box1 (Tensor[N, 4]) box2 (Tensor[M, 4]) Returns: iou (Tensor[N, M]): the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2 """ def box_area(box): # box = 4xn return (box[2] - box[0]) * (box[3] - box[1]) area1 = box_area(box1.T) area2 = box_area(box2.T) # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) inter = (np.minimum(box1[:, None, 2:], box2[:, 2:]) - np.maximum(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) return inter / (area1[:, None] + area2 - inter ) # iou = inter / (area1 + area2 - inter) def _nms(boxes, scores, iou_threshold): x1, y1 = boxes[:, 0], boxes[:, 1] x2, y2 = boxes[:, 2], boxes[:, 3] areas = (x2 - x1 + 1) * (y2 - y1 + 1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= iou_threshold)[0] order = order[inds + 1] result = np.stack(keep) return result def non_max_suppression( prediction, conf_thres=0.25, iou_thres=0.45, multi_label=False, labels=(), ): """Runs Non-Maximum Suppression (NMS) on inference results Returns: list of detections, on (n,6) tensor per image [xyxy, conf, cls] """ nc = prediction.shape[2] - 5 # number of classes xc = prediction[..., 4] > conf_thres # candidates # Settings max_det = 300 # maximum number of detections per image max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() time_limit = 10.0 # seconds to quit after multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) t = time.time() output = [np.zeros((0, 6))] * prediction.shape[0] for xi, x in enumerate(prediction): # image index, image inference # Apply constraints x = x[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): l = labels[xi] v = np.zeros((len(l), nc + 5), device=x.device) v[:, :4] = l[:, 1:5] # box v[:, 4] = 1.0 # conf v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls x = np.concatenate((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Compute conf x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf # Box (center x, center y, width, height) to (x1, y1, x2, y2) box = _xywh2xyxy(x[:, :4]) # Detections matrix nx6 (xyxy, conf, cls) if multi_label: i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T x = np.concatenate((box[i], x[i, j + 5, None], j[:, None].float()), 1) else: # best class only conf = np.max(x[:, 5:], axis=1, keepdims=True) j = np.argmax(x[:, 5:], axis=1) j = j.reshape((j.shape[0], 1)) x = np.concatenate((box, conf, j.astype('float32')), 1)[conf.reshape(-1) > conf_thres] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue elif n > max_nms: # excess boxes x = x[x[:, 4].argsort( descending=True)[:max_nms]] # sort by confidence # NMS boxes, scores = x[:, :4], x[:, 4] # boxes (offset by class), scores i = _nms( boxes, scores, iou_thres, ) if i.shape[0] > max_det: # limit detections i = i[:max_det] output[xi] = x[i] if (time.time() - t) > time_limit: print(f'WARNING: NMS time limit {time_limit}s exceeded') break # time limit exceeded return output def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): # Rescale coords (xyxy) from img1_shape to img0_shape if ratio_pad is None: # calculate from img0_shape gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new pad = (img1_shape[1] - img0_shape[1] * gain) / 2, ( img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding else: gain = ratio_pad[0][0] pad = ratio_pad[1] coords[:, [0, 2]] -= pad[0] # x padding coords[:, [1, 3]] -= pad[1] # y padding coords[:, :4] /= gain _clip_coords(coords, img0_shape) return coords def _clip_coords(boxes, img_shape): # Clip bounding xyxy bounding boxes to image shape (height, width) boxes[:, 0] = np.clip(boxes[:, 0], 0, img_shape[1]) # x1 boxes[:, 1] = np.clip(boxes[:, 1], 0, img_shape[0]) # y1 boxes[:, 2] = np.clip(boxes[:, 2], 0, img_shape[1]) # x2 boxes[:, 3] = np.clip(boxes[:, 3], 0, img_shape[0]) # y1 def driving_area_mask(seg, pad_wh=None): if pad_wh is None: return 1.0 - seg[0][0] else: temp_seg = copy.deepcopy(seg[0][0]) pad_w = int(pad_wh[0]) pad_h = int(pad_wh[1]) seg_width = int(temp_seg.shape[1]) seg_height = int(temp_seg.shape[0]) temp_seg = temp_seg[pad_h:seg_height - pad_h, pad_w:seg_width - pad_w] return 1.0 - temp_seg def lane_line_mask(ll, pad_wh=None): if pad_wh is None: return ll[0][0] else: temp_ll = copy.deepcopy(ll[0][0]) pad_w = int(pad_wh[0]) pad_h = int(pad_wh[1]) seg_width = int(temp_ll.shape[1]) seg_height = int(temp_ll.shape[0]) temp_ll = temp_ll[pad_h:seg_height - pad_h, pad_w:seg_width - pad_w] return temp_ll