import os import copy import time import argparse import cv2 import numpy as np import onnxruntime from utils import utils_onnx from pathlib import Path from utils.utils_onnx import increment_path, LoadImages import shutil import platform def get_args(): parser = argparse.ArgumentParser() parser.add_argument( '--video', type=str, default='sample.mp4', ) parser.add_argument( "-c", "--compile", action="store_true", help="Run in Model compilation mode" ) parser.add_argument( "-d", "--disable_offload", action="store_true", help="Disable offload to TIDL" ) parser.add_argument( '--model', type=str, default='weight/YOLOPv2.onnx', ) parser.add_argument( '--score_th', type=float, default=0.3, ) parser.add_argument( '--nms_th', type=float, default=0.45, ) parser.add_argument( '--save', action='store_true', help='save images/videos' ) parser.add_argument( '--source', type=str, default='sample.mp4', help='input source' ) parser.add_argument( '--project', default='runs/detect', help='save results to project/name' ) parser.add_argument( '--name', default='exp', help='save results to project/name' ) parser.add_argument( '--exist-ok', action='store_true', help='existing project/name ok, do not increment' ) parser.add_argument( '--screen', type=int, default=1, help='Screen number you want to use for capturing' ) parser.add_argument( '--log', type=int, default=2, help='Log severity level.0:Verbose, 1:Info, 2:Warning. 3:Error, 4:Fatal.' ) args = parser.parse_args() return args def run_inference( onnx_session, image, score_th, nms_th, ): # 前処理 # パディング処理を実行 input_image = copy.deepcopy(image) input_image, _, (pad_w, pad_h) = utils_onnx.letterbox(input_image) # BGR→RGB変換 input_image = input_image[:, :, ::-1].transpose(2, 0, 1) # PyTorch Tensorに変換 input_image = np.ascontiguousarray(input_image) # 正規化 input_image = input_image.astype('float32') input_image /= 255.0 # NCHWに変換 input_image = np.expand_dims(input_image, axis=0) # 推論 input_details = onnx_session.get_inputs() input_name = input_details[0].name input_shape = input_details[0].shape results = onnx_session.run(None, {input_name: input_image}) result_dets = [] result_dets.append(results[0][0]) result_dets.append(results[0][1]) result_dets.append(results[0][2]) anchor_grid = [] anchor_grid.append(results[1]) anchor_grid.append(results[2]) anchor_grid.append(results[3]) # 後処理 # 車検出 result_dets = utils_onnx.split_for_trace_model( result_dets, anchor_grid, ) result_dets = utils_onnx.non_max_suppression( result_dets, conf_thres=score_th, iou_thres=nms_th, ) bboxes = [] scores = [] class_ids = [] for result_det in result_dets: if len(result_det) > 0: # バウンディングボックスのスケールを調整 result_det[:, :4] = utils_onnx.scale_coords( input_image.shape[2:], result_det[:, :4], image.shape, ).round() # バウンディングボックス、スコア、クラスIDを取得 for *xyxy, score, class_id in reversed(result_det): x1, y1 = xyxy[0], xyxy[1] x2, y2 = xyxy[2], xyxy[3] bboxes.append([int(x1), int(y1), int(x2), int(y2)]) scores.append(float(score)) class_ids.append(int(class_id)) # 路面セグメンテーション result_road_seg = utils_onnx.driving_area_mask( results[4], (pad_w, pad_h), ) # レーンセグメンテーション result_lane_seg = utils_onnx.lane_line_mask( results[5], (pad_w, pad_h), ) return (bboxes, scores, class_ids), result_road_seg, result_lane_seg def main(): # 引数 args = get_args() # Enforce compilation on x86 only if platform.machine() == "aarch64" and args.compile == True: print( "Compilation of models is only supported on x86 machine \n\ Please do the compilation on PC and copy artifacts for running on TIDL devices " ) exit(-1) source = args.source screen = args.screen save_img = args.save # save inference images if save_img: save_dir = Path(increment_path(Path(args.project) / args.name, exist_ok=args.exist_ok)) # increment run save_dir.mkdir(parents=True, exist_ok=True) # make save_dir video_path = args.video model_path = args.model score_th = args.score_th nms_th = args.nms_th # ONNXファイル有無確認 if not os.path.isfile(model_path): import urllib.request url = 'https://github.com/Kazuhito00/YOLOPv2-ONNX-Sample/releases/download/v0.0.0/YOLOPv2.onnx' weights_save_path = 'weight/YOLOPv2.onnx' print('Start Download:YOLOPv2.onnx') urllib.request.urlretrieve(url, weights_save_path) print('Finish Download') # モデルロード c7x_firmware_version = "11_00_00_00" #set variable for firmware version. compile_options = {} so = onnxruntime.SessionOptions() so.log_severity_level = args.log compile_options['artifacts_folder'] = 'custom-artifacts/yolopv2' compile_options['tidl_tools_path'] = os.environ.get("TIDL_TOOLS_PATH") compile_options['advanced_options:c7x_firmware_version'] = c7x_firmware_version # compile_options['calibration_frames'] = 1 print(f"compile_options: {compile_options}") calib_images = os.listdir('calib-imgs') if args.compile: import onnx os.makedirs(compile_options["artifacts_folder"], exist_ok=True) for root, dirs, files in os.walk( compile_options["artifacts_folder"], topdown=False ): [os.remove(os.path.join(root, f)) for f in files] [os.rmdir(os.path.join(root, d)) for d in dirs] EP_list = ['TIDLCompilationProvider', 'CPUExecutionProvider'] # Shape inference needed for offload to C7x onnx.shape_inference.infer_shapes_path( model_path, model_path ) onnx_session = onnxruntime.InferenceSession( model_path, providers=EP_list, provider_options=[compile_options, {}], sess_options=so ) print(f"EP: {onnx_session.get_providers()}") elif args.disable_offload: EP_list = ['CPUExecutionProvider'] onnx_session = onnxruntime.InferenceSession( model_path, providers=EP_list, sess_options=so ) print(f"EP: {onnx_session.get_providers()}") else: EP_list = ['TIDLExecutionProvider', 'CPUExecutionProvider'] onnx_session = onnxruntime.InferenceSession( model_path, providers=EP_list, provider_options=[compile_options, {}], sess_options=so ) print(f"EP: {onnx_session.get_providers()}") vid_path, vid_writer = None, None dataset = LoadImages(source, screen=screen) # ビデオ読み込み #video_capture = cv2.VideoCapture(video_path) for path, frame, vid_cap in dataset: start_time = time.time() # 画像読み込み # ret, frame = video_capture.read() # if not ret: # break # 推論 (bboxes, scores, class_ids), road_seg, lane_seg = run_inference( onnx_session, frame, score_th, nms_th, ) fps = 1 / (time.time() - start_time) # 推論結果可視化 debug_image = draw_debug_image( frame, (bboxes, scores, class_ids), road_seg, lane_seg, fps, ) if not args.compile: # Image height and width are needed for drawing mid points on the image prepared from all the results of seg, lanes, boxes img_height, img_width, _ = debug_image.shape # cv2.imshow("YOLOPv2", debug_image) # key = cv2.waitKey(1) # if key == 27: # ESC # break out_pipeline = "appsrc ! videoconvert ! kmssink driver-name=tidss sync=false" # Save image/video with segments, lanes and mid points shown. Not relevant for inference though. if save_img: p = Path(path) save_path = str(save_dir / p.name) # img.jpg if dataset.mode == 'image': cv2.imwrite(save_path, debug_image) print(f" The image with the result is saved in: {save_path}") else: # 'video' or 'stream' if vid_path != save_path: # new video vid_path = save_path if isinstance(vid_writer, cv2.VideoWriter): vid_writer.release() # release previous video writer if vid_cap: # video fps = vid_cap.get(cv2.CAP_PROP_FPS) #w = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) #h = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) w,h = debug_image.shape[1], debug_image.shape[0] else: # stream fps, w, h = 30, debug_image.shape[1], debug_image.shape[0] save_path += '.mp4' vid_writer = cv2.VideoWriter(out_pipeline, cv2.CAP_GSTREAMER, 0, fps, (w, h)) vid_writer.write(debug_image) # video_capture.release() # cv2.destroyAllWindows() def draw_debug_image( image, car_dets, road_seg, lane_seg, fps, ): debug_image = copy.deepcopy(image) # 路面セグメンテーション image_width, image_height = debug_image.shape[1], debug_image.shape[0] # マスク画像を生成 road_mask = np.stack((road_seg, ) * 3, axis=-1).astype('float32') road_mask = cv2.resize( road_mask, dsize=(image_width, image_height), interpolation=cv2.INTER_LINEAR, ) road_mask = np.where(road_mask > 0.5, 0, 1) # マスク画像と画像を合成 bg_image = np.zeros(debug_image.shape, dtype=np.uint8) bg_image[:] = [0, 255, 0] road_mask_image = np.where(road_mask, debug_image, bg_image) # 半透明画像として合成 debug_image = cv2.addWeighted(debug_image, 0.5, road_mask_image, 0.5, 1.0) # レーンセグメンテーション # マスク画像を生成 road_mask = np.stack((lane_seg, ) * 3, axis=-1).astype('float32') road_mask = cv2.resize( road_mask, dsize=(image_width, image_height), interpolation=cv2.INTER_LINEAR, ) road_mask = np.where(road_mask > 0.5, 0, 1) # マスク画像と画像を合成 bg_image = np.zeros(debug_image.shape, dtype=np.uint8) bg_image[:] = [0, 0, 255] road_mask_image = np.where(road_mask, debug_image, bg_image) # 半透明画像として合成 debug_image = cv2.addWeighted(debug_image, 0.5, road_mask_image, 0.5, 1.0) # 車検出結果 for bbox, score, class_id in zip(*car_dets): # バウンディングボックス cv2.rectangle( debug_image, pt1=(bbox[0], bbox[1]), pt2=(bbox[2], bbox[3]), color=(0, 255, 255), thickness=2, ) # クラスID、スコア # text = '%s:%s' % (str(class_id), '%.2f' % score) # cv2.putText( # debug_image, # text, # (bbox[0], bbox[1] - 10), # cv2.FONT_HERSHEY_SIMPLEX, # 0.7, # color=(0, 255, 255), # thickness=2, # ) # 処理時間 cv2.putText( debug_image, "FPS:" + '{:.1f}'.format(fps), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2, cv2.LINE_AA, ) return debug_image if __name__ == "__main__": main()