Tool/software:
Hello! I am using the attached python script to run anomaly detection on the am62a board. I have already generated the model artifacts and am using those. I am using opencv for the gui as well. Now, this script works but the mouse response time is very slow, the mouse doesn't move smoothly. Also, the inference speed is reduced a little bit. Why is that happening and how can I optimize this code to avoid these problems and ensure a smooth and fast run? Thank you
import gi import os import numpy as np import cv2 # OpenCV for video display and overlay import tflite_runtime.interpreter as tflite import time # Import time for performance tracking # Ensure the correct version of GStreamer is loaded gi.require_version('Gst', '1.0') from gi.repository import Gst # Initialize global variables anomaly_threshold = 2.8 overlay_heatmap_enabled = True score = 0.0 is_anomaly = False tflite_model_path = '/opt/edgeai-tidl-artifacts/cl-tfl-fomoad/trained.tflite' artifacts_folder = '/opt/edgeai-tidl-artifacts/cl-tfl-fomoad' interpreter = None cap = None # Global video capture object # Button positions button_positions = { 'Load': (750, 580, 950, 630), 'Take': (750, 500, 950, 550) # Add 'Take' button coordinates } # Ensure the datasets_good folder exists os.makedirs('datasets_good', exist_ok=True) # Initialize the image index image_index = 0 def gstreamer_pipeline(): return ( 'v4l2src device=/dev/video3 io-mode=dmabuf-import ! ' 'video/x-bayer, width=640, height=480, framerate=30/1, format=rggb10 ! ' 'tiovxisp sink_0::device=/dev/v4l-subdev2 sensor-name="SENSOR_SONY_IMX219_RPI" ' 'dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss_10b_640x480.bin sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a_10b_640x480.bin format-msb=9 ! ' 'video/x-raw, format=NV12, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, format=BGR ! appsink' ) def preprocess_frame(frame, target_size=(96, 96)): start_time = time.time() frame_resized = cv2.resize(frame, target_size) frame_normalized = frame_resized.astype('float32') / 255.0 # Normalize to [0, 1] frame_normalized = np.expand_dims(frame_normalized, axis=0) # Add batch dimension print(f"Preprocessing time: {time.time() - start_time:.4f} seconds") return frame_normalized def generate_grid_heatmap(scores, target_size): start_time = time.time() normalized_scores = cv2.normalize(scores, None, 0, 255, cv2.NORM_MINMAX) normalized_scores = np.uint8(normalized_scores) # Get the original size of the scores score_height, score_width = normalized_scores.shape # Calculate the size of each cell in the heatmap cell_width = target_size[0] // score_width cell_height = target_size[1] // score_height # Initialize heatmap with zeros heatmap = np.zeros((target_size[1], target_size[0]), dtype=np.uint8) # Fill the heatmap with upscaled values for i in range(score_height): for j in range(score_width): heatmap[i * cell_height:(i + 1) * cell_height, j * cell_width:(j + 1) * cell_width] = normalized_scores[i, j] # Apply color map to heatmap heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET) print(f"Heatmap generation time: {time.time() - start_time:.4f} seconds") return heatmap def overlay_heatmap(image, heatmap, alpha=0.6): start_time = time.time() output = cv2.addWeighted(heatmap, alpha, image, 1 - alpha, 0) print(f"Overlay heatmap time: {time.time() - start_time:.4f} seconds") return output def draw_gui(frame): """Draws the GUI elements around the live stream display.""" start_time = time.time() global anomaly_threshold, overlay_heatmap_enabled, score, is_anomaly # Set GUI background and dimensions gui_frame = np.zeros((700, 1000, 3), dtype=np.uint8) # Draw video frame into the GUI gui_frame[50:530, 50:690] = frame # Draw slider for anomaly threshold cv2.putText(gui_frame, 'Anomaly Threshold', (750, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) cv2.rectangle(gui_frame, (750, 150), (950, 170), (255, 255, 255), -1) # Trackbar background cv2.rectangle(gui_frame, (750, 150), (int(750 + anomaly_threshold * 20), 170), (0, 0, 255), -1) # Trackbar slider # Draw checkbox for heatmap overlay cv2.putText(gui_frame, 'Overlay Heatmap', (750, 220), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) cv2.rectangle(gui_frame, (750, 250), (770, 270), (255, 255, 255), 2) # Checkbox border if overlay_heatmap_enabled: cv2.rectangle(gui_frame, (750, 250), (770, 270), (0, 255, 0), -1) # Filled checkbox when enabled # Draw score and status cv2.putText(gui_frame, f'Score: {score:.2f}', (750, 320), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) status_text = 'Anomalous' if is_anomaly else 'Good' status_color = (0, 0, 255) if is_anomaly else (0, 255, 0) cv2.putText(gui_frame, status_text, (750, 360), cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) # Draw buttons for label, (x1, y1, x2, y2) in button_positions.items(): cv2.rectangle(gui_frame, (x1, y1), (x2, y2), (200, 200, 200), -1) cv2.putText(gui_frame, label, (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1) print(f"GUI drawing time: {time.time() - start_time:.4f} seconds") return gui_frame def on_mouse(event, x, y, flags, param): """Handles mouse events for GUI interaction.""" global anomaly_threshold, overlay_heatmap_enabled, image_index if event == cv2.EVENT_LBUTTONDOWN: # Check if the click was on the anomaly threshold slider if 750 <= x <= 950 and 150 <= y <= 170: anomaly_threshold = (x - 750) / 20.0 # Check if the click was on the heatmap checkbox elif 750 <= x <= 770 and 250 <= y <= 270: overlay_heatmap_enabled = not overlay_heatmap_enabled # Check if the click was on the Load button elif 750 <= x <= 950 and 580 <= y <= 630: load_model_and_artifacts() # Check if the click was on the Take button elif 750 <= x <= 950 and 500 <= y <= 550: capture_image() def capture_image(): """Captures and saves the current frame as an image.""" global image_index, cap start_time = time.time() filename = f"datasets_good/captured_image_{image_index:04d}.jpg" ret, frame = cap.read() if ret: cv2.imwrite(filename, frame) print(f"Image captured and saved as {filename}") image_index += 1 else: print("Failed to capture image") print(f"Image capture time: {time.time() - start_time:.4f} seconds") def load_model_and_artifacts(): """Prompts the user for a new model and artifacts folder path.""" global tflite_model_path, artifacts_folder, interpreter start_time = time.time() # Prompt user for new model path print("Enter new model path or press Enter to keep the default:") new_model_path = input().strip() if new_model_path: tflite_model_path = new_model_path # Prompt user for new artifacts folder path print("Enter new artifacts folder path or press Enter to keep the default:") new_artifacts_folder = input().strip() if new_artifacts_folder: artifacts_folder = new_artifacts_folder # Reload the model with the new paths try: tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so', {'artifacts_folder': artifacts_folder})] interpreter = tflite.Interpreter(model_path=tflite_model_path, experimental_delegates=tidl_delegate) interpreter.allocate_tensors() print(f"Model loaded successfully from {tflite_model_path} with artifacts from {artifacts_folder}.") except Exception as e: print(f"Failed to load model: {e}") print(f"Model loading time: {time.time() - start_time:.4f} seconds") def main(): global score, is_anomaly, interpreter, cap # Initialize GStreamer Gst.init(None) # Load the TFLite model with TIDL support start_time = time.time() try: tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so', {'artifacts_folder': artifacts_folder})] interpreter = tflite.Interpreter(model_path=tflite_model_path, experimental_delegates=tidl_delegate) interpreter.allocate_tensors() print(f"Model loaded successfully.") except Exception as e: print(f"Failed to load model: {e}") return print(f"Model initialization time: {time.time() - start_time:.4f} seconds") # Initialize video capture with GStreamer pipeline using OpenCV cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER) if not cap.isOpened(): print("Error: Unable to open video source.") return # Get input and output details for the interpreter input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() # Create OpenCV window and set mouse callback cv2.namedWindow('GUI') cv2.setMouseCallback('GUI', on_mouse) while cap.isOpened(): frame_start_time = time.time() ret, frame = cap.read() if not ret: print("Failed to grab frame.") break print(f"Frame capture time: {time.time() - frame_start_time:.4f} seconds") # Preprocess frame: Resize to 96x96 as required by the model input_data = preprocess_frame(frame, target_size=(96, 96)) # Run inference with TensorFlow Lite and TIDL inference_start_time = time.time() interpreter.set_tensor(input_details[0]['index'], input_data) interpreter.invoke() print(f"Inference time: {time.time() - inference_start_time:.4f} seconds") # Get the anomaly scores (output is a 4D tensor) anomaly_scores = interpreter.get_tensor(output_details[0]['index']) # Reduce the 4D tensor to a 2D matrix by taking the mean across the depth dimension anomaly_scores_2d = np.mean(anomaly_scores[0], axis=-1) # Calculate max and mean anomaly scores from the 2D matrix max_anomaly_score = np.max(anomaly_scores_2d) mean_anomaly_score = np.mean(anomaly_scores_2d) score = mean_anomaly_score # Overlay the heatmap on the original image if enabled if overlay_heatmap_enabled: heatmap = generate_grid_heatmap(anomaly_scores_2d, target_size=(frame.shape[1], frame.shape[0])) overlayed_image = overlay_heatmap(frame, heatmap) else: overlayed_image = frame.copy() # Determine if the sample is an anomaly based on the threshold is_anomaly = max_anomaly_score > anomaly_threshold # Draw GUI elements around the live stream display gui_frame = draw_gui(overlayed_image) # Display the frame with OpenCV display_start_time = time.time() cv2.imshow('GUI', gui_frame) print(f"Display update time: {time.time() - display_start_time:.4f} seconds") # Keyboard controls key = cv2.waitKey(1) & 0xFF if key == ord('q'): # Quit the application break # Release the video capture and close all OpenCV windows cap.release() cv2.destroyAllWindows() if __name__ == "__main__": main()