Tool/software:
I am trying to run the following code on the board but instead of using the opencv gui window, I want to use the gstreamer pipeline with kmssink to display the output frame directly. I am following this example : https://github.com/TexasInstruments-Sandbox/edgeai-gst-apps-people-tracking import numpy as np
import tflite_runtime.interpreter as tflite # Use tflite_runtime for TIDL delegate support
import faiss
import cv2
from PIL import Image
import os
MODEL_PATH = '/opt/edgeai-tidl-artifacts/cl-tfl-fomo-01/trained.tflite'
CAPTURED_IMAGES_DIR = './captured_frames' # Directory to save captured frames
FAISS_INDEX_PATH = 'faiss_index.bin'
ANOMALY_THRESHOLD = 0.5
# Preprocess frame to match model input
def process_frame(frame, interpreter):
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_shape = input_details[0]['shape']
img_resized = img.resize([input_shape[1], input_shape[2]])
np_frame = np.array(img_resized, dtype=np.float32) / 255.0
np_frame = np.expand_dims(np_frame, axis=0) # Add batch dimension
interpreter.set_tensor(input_details[0]['index'], np_frame)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
return output_data
# Build the FAISS index using reference ("good") examples
def build_faiss_index(captured_images, interpreter, output_resolution=[12, 12]):
first_example_response = process_frame(cv2.imread(captured_images[0]), interpreter)
feature_dim = first_example_response.shape[-1]
faiss_index = faiss.IndexFlatL2(feature_dim)
for image_path in captured_images:
response = process_frame(cv2.imread(image_path), interpreter)
for i in range(output_resolution[0]):
for j in range(output_resolution[1]):
patch_features = response[0, i, j, :].reshape(1, -1)
faiss_index.add(patch_features.astype(np.float32))
return faiss_index
# Save FAISS index to disk
def save_faiss_index(faiss_index, index_path):
faiss.write_index(faiss_index, index_path)
# Load FAISS index from disk
def load_faiss_index(index_path):
return faiss.read_index(index_path)
# Compute anomaly scores using FAISS
def compute_anomaly_faiss(image_features, faiss_index, output_resolution):
distances = np.zeros(output_resolution)
for i in range(output_resolution[0]):
for j in range(output_resolution[1]):
f_vec = image_features[0, i, j, :].reshape(1, -1)
distance, _ = faiss_index.search(f_vec, 1)
distances[i, j] = distance[0][0]
return distances
# GStreamer pipeline configuration for TI AM62A board
def gstreamer_pipeline():
return (
'v4l2src device=/dev/video3 io-mode=dmabuf-import ! '
'video/x-bayer, width=640, height=480, framerate=15/1, format=rggb10 ! '
'tiovxisp sink_0::device=/dev/v4l-subdev2 sensor-name="SENSOR_SONY_IMX219_RPI" '
'dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss_10b_640x480.bin sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a_10b_640x480.bin format-msb=9 ! '
'video/x-raw, format=NV12, width=640, height=480, framerate=15/1 ! videoconvert ! video/x-raw, format=BGR ! appsink'
)
# Webcam stream with training and inference functionality
def evaluate_fomo_stream(interpreter):
cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER)
captured_images = []
faiss_index = None # No FAISS index initially
def capture_frame(event, x, y, flags, param):
nonlocal captured_images
if event == cv2.EVENT_LBUTTONDOWN:
print("Capturing frame for training...")
img_path = os.path.join(CAPTURED_IMAGES_DIR, f'captured_{len(captured_images)}.png')
cv2.imwrite(img_path, frame) # Save the frame without heatmap
captured_images.append(img_path)
print(f"Captured and saved: {img_path}")
cv2.namedWindow("Anomaly Detection")
cv2.setMouseCallback("Anomaly Detection", capture_frame)
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame")
break
# If we have enough images for training (say, 10), build FAISS index
if len(captured_images) >= 10 and faiss_index is None:
print("Building FAISS index with captured images...")
faiss_index = build_faiss_index(captured_images, interpreter)
save_faiss_index(faiss_index, FAISS_INDEX_PATH)
print("FAISS index saved and inference mode is enabled.")
if faiss_index is not None:
image_feat = process_frame(frame, interpreter)
anomaly_score = compute_anomaly_faiss(image_feat, faiss_index, [12, 12])
global_anomaly_score = np.mean(anomaly_score)
classification = 'Anomaly' if global_anomaly_score > ANOMALY_THRESHOLD else 'No Anomaly'
anomaly_heatmap = cv2.resize(anomaly_score, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_NEAREST)
anomaly_heatmap_normalized = cv2.normalize(anomaly_heatmap, None, 0, 255, cv2.NORM_MINMAX)
anomaly_heatmap_colored = cv2.applyColorMap(np.uint8(anomaly_heatmap_normalized), cv2.COLORMAP_JET)
overlay_frame = cv2.addWeighted(frame, 0.6, anomaly_heatmap_colored, 0.4, 0)
cv2.putText(overlay_frame, f'Anomaly Score: {global_anomaly_score:.2f}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 255), 2)
cv2.putText(overlay_frame, f'Classification: {classification}', (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 0, 255) if classification == 'Anomaly' else (0, 255, 0), 2)
cv2.imshow("Anomaly Detection", overlay_frame)
else:
cv2.imshow("Anomaly Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
# Create a directory to save captured frames
if not os.path.exists(CAPTURED_IMAGES_DIR):
os.makedirs(CAPTURED_IMAGES_DIR)
# Load the TFLite model with TIDL delegate support
tidl_delegate = [tflite.load_delegate('libtidl_tfl_delegate.so', {'artifacts_folder': '/opt/edgeai-tidl-artifacts/cl-tfl-fomo-01'})]
interpreter = tflite.Interpreter(model_path=MODEL_PATH, experimental_delegates=tidl_delegate)
interpreter.allocate_tensors()
# Start real-time anomaly detection with live training
evaluate_fomo_stream(interpreter)
But I am a little confused as to how I am supposed to break down the code. What parts would I need as a pre or post process and especially how can I do the mouse clicking function for 10 times to click pictures and then use them to generate and save the faiss index. How do I create the gstreamer pipeline(do I even have to do that manually or is it automatically generated using the example above?), what parts of the pipeline are already taken care of through the scripts in the example? I want to use OptiFlow so that the pipeline is not slow. If you could please help me break down the classes/files/pipeline I would need, it would be great.
Here is the link to the config yaml file I have made, could you please tell me if it is alright for this purpose? drive.google.com/.../view
Thank you