Because of the holidays, TI E2E™ design support forum responses will be delayed from Dec. 25 through Jan. 2. Thank you for your patience.

This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

SK-AM62A-LP: Problem with speed and responsiveness of python script

Part Number: SK-AM62A-LP

Tool/software:

Hello! I am using the attached python script to run anomaly detection on the am62a board. I have already generated the model artifacts and am using those. I am using opencv for the gui as well. Now, this script works but the mouse response time is very slow, the mouse doesn't move smoothly. Also, the inference speed is reduced a little bit. Why is that happening and how can I optimize this code to avoid these problems and ensure a smooth and fast run? Thank you Slight smile

import gi
import os
import numpy as np
import cv2  # OpenCV for video display and overlay
import tflite_runtime.interpreter as tflite
import time  # Import time for performance tracking

# Ensure the correct version of GStreamer is loaded
gi.require_version('Gst', '1.0')
from gi.repository import Gst

# Initialize global variables
anomaly_threshold = 2.8
overlay_heatmap_enabled = True
score = 0.0
is_anomaly = False
tflite_model_path = '/opt/edgeai-tidl-artifacts/cl-tfl-fomoad/trained.tflite'
artifacts_folder = '/opt/edgeai-tidl-artifacts/cl-tfl-fomoad'
interpreter = None
cap = None  # Global video capture object

# Button positions
button_positions = {
    'Load': (750, 580, 950, 630),
    'Take': (750, 500, 950, 550)  # Add 'Take' button coordinates
}

# Ensure the datasets_good folder exists
os.makedirs('datasets_good', exist_ok=True)

# Initialize the image index
image_index = 0

def gstreamer_pipeline():
    return (
        'v4l2src device=/dev/video3 io-mode=dmabuf-import ! '
        'video/x-bayer, width=640, height=480, framerate=30/1, format=rggb10 ! '
        'tiovxisp sink_0::device=/dev/v4l-subdev2 sensor-name="SENSOR_SONY_IMX219_RPI" '
        'dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss_10b_640x480.bin sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a_10b_640x480.bin format-msb=9 ! '
        'video/x-raw, format=NV12, width=640, height=480, framerate=30/1 ! videoconvert ! video/x-raw, format=BGR ! appsink'
    )

def preprocess_frame(frame, target_size=(96, 96)):
    start_time = time.time()
    frame_resized = cv2.resize(frame, target_size)
    frame_normalized = frame_resized.astype('float32') / 255.0  # Normalize to [0, 1]
    frame_normalized = np.expand_dims(frame_normalized, axis=0)  # Add batch dimension
    print(f"Preprocessing time: {time.time() - start_time:.4f} seconds")
    return frame_normalized

def generate_grid_heatmap(scores, target_size):
    start_time = time.time()
    normalized_scores = cv2.normalize(scores, None, 0, 255, cv2.NORM_MINMAX)
    normalized_scores = np.uint8(normalized_scores)

    # Get the original size of the scores
    score_height, score_width = normalized_scores.shape

    # Calculate the size of each cell in the heatmap
    cell_width = target_size[0] // score_width
    cell_height = target_size[1] // score_height

    # Initialize heatmap with zeros
    heatmap = np.zeros((target_size[1], target_size[0]), dtype=np.uint8)

    # Fill the heatmap with upscaled values
    for i in range(score_height):
        for j in range(score_width):
            heatmap[i * cell_height:(i + 1) * cell_height, j * cell_width:(j + 1) * cell_width] = normalized_scores[i, j]

    # Apply color map to heatmap
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    print(f"Heatmap generation time: {time.time() - start_time:.4f} seconds")
    return heatmap

def overlay_heatmap(image, heatmap, alpha=0.6):
    start_time = time.time()
    output = cv2.addWeighted(heatmap, alpha, image, 1 - alpha, 0)
    print(f"Overlay heatmap time: {time.time() - start_time:.4f} seconds")
    return output

def draw_gui(frame):
    """Draws the GUI elements around the live stream display."""
    start_time = time.time()
    global anomaly_threshold, overlay_heatmap_enabled, score, is_anomaly

    # Set GUI background and dimensions
    gui_frame = np.zeros((700, 1000, 3), dtype=np.uint8)

    # Draw video frame into the GUI
    gui_frame[50:530, 50:690] = frame

    # Draw slider for anomaly threshold
    cv2.putText(gui_frame, 'Anomaly Threshold', (750, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.rectangle(gui_frame, (750, 150), (950, 170), (255, 255, 255), -1)  # Trackbar background
    cv2.rectangle(gui_frame, (750, 150), (int(750 + anomaly_threshold * 20), 170), (0, 0, 255), -1)  # Trackbar slider

    # Draw checkbox for heatmap overlay
    cv2.putText(gui_frame, 'Overlay Heatmap', (750, 220), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.rectangle(gui_frame, (750, 250), (770, 270), (255, 255, 255), 2)  # Checkbox border
    if overlay_heatmap_enabled:
        cv2.rectangle(gui_frame, (750, 250), (770, 270), (0, 255, 0), -1)  # Filled checkbox when enabled

    # Draw score and status
    cv2.putText(gui_frame, f'Score: {score:.2f}', (750, 320), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    status_text = 'Anomalous' if is_anomaly else 'Good'
    status_color = (0, 0, 255) if is_anomaly else (0, 255, 0)
    cv2.putText(gui_frame, status_text, (750, 360), cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2)

    # Draw buttons
    for label, (x1, y1, x2, y2) in button_positions.items():
        cv2.rectangle(gui_frame, (x1, y1), (x2, y2), (200, 200, 200), -1)
        cv2.putText(gui_frame, label, (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)

    print(f"GUI drawing time: {time.time() - start_time:.4f} seconds")
    return gui_frame

def on_mouse(event, x, y, flags, param):
    """Handles mouse events for GUI interaction."""
    global anomaly_threshold, overlay_heatmap_enabled, image_index
    if event == cv2.EVENT_LBUTTONDOWN:
        # Check if the click was on the anomaly threshold slider
        if 750 <= x <= 950 and 150 <= y <= 170:
            anomaly_threshold = (x - 750) / 20.0
        # Check if the click was on the heatmap checkbox
        elif 750 <= x <= 770 and 250 <= y <= 270:
            overlay_heatmap_enabled = not overlay_heatmap_enabled
        # Check if the click was on the Load button
        elif 750 <= x <= 950 and 580 <= y <= 630:
            load_model_and_artifacts()
        # Check if the click was on the Take button
        elif 750 <= x <= 950 and 500 <= y <= 550:
            capture_image()

def capture_image():
    """Captures and saves the current frame as an image."""
    global image_index, cap
    start_time = time.time()
    filename = f"datasets_good/captured_image_{image_index:04d}.jpg"
    ret, frame = cap.read()
    if ret:
        cv2.imwrite(filename, frame)
        print(f"Image captured and saved as {filename}")
        image_index += 1
    else:
        print("Failed to capture image")
    print(f"Image capture time: {time.time() - start_time:.4f} seconds")

def load_model_and_artifacts():
    """Prompts the user for a new model and artifacts folder path."""
    global tflite_model_path, artifacts_folder, interpreter
    start_time = time.time()

    # Prompt user for new model path
    print("Enter new model path or press Enter to keep the default:")
    new_model_path = input().strip()
    if new_model_path:
        tflite_model_path = new_model_path

    # Prompt user for new artifacts folder path
    print("Enter new artifacts folder path or press Enter to keep the default:")
    new_artifacts_folder = input().strip()
    if new_artifacts_folder:
        artifacts_folder = new_artifacts_folder

    # Reload the model with the new paths
    try:
        tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so',
                                              {'artifacts_folder': artifacts_folder})]
        interpreter = tflite.Interpreter(model_path=tflite_model_path, experimental_delegates=tidl_delegate)
        interpreter.allocate_tensors()
        print(f"Model loaded successfully from {tflite_model_path} with artifacts from {artifacts_folder}.")
    except Exception as e:
        print(f"Failed to load model: {e}")
    print(f"Model loading time: {time.time() - start_time:.4f} seconds")

def main():
    global score, is_anomaly, interpreter, cap

    # Initialize GStreamer
    Gst.init(None)

    # Load the TFLite model with TIDL support
    start_time = time.time()
    try:
        tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so',
                                              {'artifacts_folder': artifacts_folder})]
        interpreter = tflite.Interpreter(model_path=tflite_model_path, experimental_delegates=tidl_delegate)
        interpreter.allocate_tensors()
        print(f"Model loaded successfully.")
    except Exception as e:
        print(f"Failed to load model: {e}")
        return
    print(f"Model initialization time: {time.time() - start_time:.4f} seconds")

    # Initialize video capture with GStreamer pipeline using OpenCV
    cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER)

    if not cap.isOpened():
        print("Error: Unable to open video source.")
        return

    # Get input and output details for the interpreter
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Create OpenCV window and set mouse callback
    cv2.namedWindow('GUI')
    cv2.setMouseCallback('GUI', on_mouse)

    while cap.isOpened():
        frame_start_time = time.time()
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame.")
            break

        print(f"Frame capture time: {time.time() - frame_start_time:.4f} seconds")

        # Preprocess frame: Resize to 96x96 as required by the model
        input_data = preprocess_frame(frame, target_size=(96, 96))

        # Run inference with TensorFlow Lite and TIDL
        inference_start_time = time.time()
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        print(f"Inference time: {time.time() - inference_start_time:.4f} seconds")

        # Get the anomaly scores (output is a 4D tensor)
        anomaly_scores = interpreter.get_tensor(output_details[0]['index'])

        # Reduce the 4D tensor to a 2D matrix by taking the mean across the depth dimension
        anomaly_scores_2d = np.mean(anomaly_scores[0], axis=-1)

        # Calculate max and mean anomaly scores from the 2D matrix
        max_anomaly_score = np.max(anomaly_scores_2d)
        mean_anomaly_score = np.mean(anomaly_scores_2d)
        score = mean_anomaly_score

        # Overlay the heatmap on the original image if enabled
        if overlay_heatmap_enabled:
            heatmap = generate_grid_heatmap(anomaly_scores_2d, target_size=(frame.shape[1], frame.shape[0]))
            overlayed_image = overlay_heatmap(frame, heatmap)
        else:
            overlayed_image = frame.copy()

        # Determine if the sample is an anomaly based on the threshold
        is_anomaly = max_anomaly_score > anomaly_threshold

        # Draw GUI elements around the live stream display
        gui_frame = draw_gui(overlayed_image)

        # Display the frame with OpenCV
        display_start_time = time.time()
        cv2.imshow('GUI', gui_frame)
        print(f"Display update time: {time.time() - display_start_time:.4f} seconds")

        # Keyboard controls
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):  # Quit the application
            break

    # Release the video capture and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()


if __name__ == "__main__":
    main()

  • Hi Pragya,

    It is our pleasure to help.

    I see that you are collecting processing time for each task. Would you please share some of the times which you collected to better assist you?

    Bests,

    Qutaiba

  • Sure, here it is: 
    Frame capture time: 0.0096 seconds
    Preprocessing time: 0.0019 seconds
    Inference time: 0.0018 seconds
    Heatmap generation time: 0.0098 seconds
    Overlay heatmap time: 0.0028 seconds
    GUI drawing time: 0.0019 seconds
    Display update time: 0.0011 seconds
    Frame capture time: 0.0105 seconds
    Preprocessing time: 0.0018 seconds
    Inference time: 0.0019 seconds
    Heatmap generation time: 0.0099 seconds
    Overlay heatmap time: 0.0028 seconds
    GUI drawing time: 0.0020 seconds
    Display update time: 0.0011 seconds
    Frame capture time: 0.0102 seconds

  • Hi Pragya,

    While the times you shared looks fine, the heavy lifting of the work is in rendering of the gui window. AM62A is not equipped with a GPU and such rendering the gui window is executed in softpipe using the ARM cores. Instead of using the opencv gui window, I suggest using gstreamer pipeline with kmssink to display the output frame directly. This is an example: https://github.com/TexasInstruments-Sandbox/edgeai-gst-apps-people-tracking 

    Best regards,

    Qutaiba