This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

TDA4VM: Multi threading with multiple models with edge ai tidl tflite

Part Number: TDA4VM


Hi, I am trying to run multiple models with multi-threading(python) with edge ai tidl tflite runtime. 

I have attached a sample code (that mimicks our scenario quite closely). If I run this code it freezes (there is no log of APP Deinit). However, if I run this code without instantiating the second model it works fine(APP Deinit etc is logged on terminal). 

You can choose and model and correpsonding artifacts as you see suitable. I could replicate this behavior with every model. Unfortunately I cannot share models.

I suspect there is some shared resource which is not released and hence the code freezes. I would like to understand is there a better way to handle multiple models and multi-threading scenario?

Looking forward to your response.

Best

Ashay 

import os
import numpy as np
import threading
import queue
import time
import copy
import tflite_runtime.interpreter as tflite
os.environ["SOC"]="am68a"
os.environ["TIDL_TOOLS_PATH"]="/home/root/edgeai-tidl-tools/"


def get_image():
    img = np.random.randint(0, 255, (1,  256, 320, 3))
    return img.astype(np.float32)

class ImageProcessor:
    def __init__(self, model_path, model_artifacts, num_threads=2):
        self.model_path = model_path
        self.model_artifacts = model_artifacts
        self.compile_options = {"tidl_tools_path": os.environ["TIDL_TOOLS_PATH"],
                                "artifacts_folder": model_artifacts}
        self.shared_obj_path = "libtidl_tfl_delegate.so"

        self.tidl_delegate = [tflite.load_delegate(self.shared_obj_path, self.compile_options)]
        self.interpreter = tflite.Interpreter(model_path=self.model_path, 
                                              experimental_delegates=self.tidl_delegate)
        self.image_queue = queue.Queue()
        self.lock = threading.Lock()
        self.num_threads = 2
    

    def run_inference(self, input_image):
        
        input_details = self.interpreter.get_input_details()
        output_details = self.interpreter.get_output_details()
        self.interpreter.resize_tensor_input(0, (1, 256, 320, 3))
        self.interpreter.allocate_tensors()
        self.interpreter.set_tensor(input_details[0]['index'], input_image)
        self.interpreter.invoke()
        output = self.interpreter.get_tensor(output_details[0]['index'])

        return output

    def worker(self):
        while True:
            images = self.image_queue.get()
            if images is None:
                break
            with self.lock:
                output = self.run_inference(images)
    
    def process_image(self):
        images = []
        threads = []
        for i in range(10):
            images.append(get_image())

        for i in images:
            self.image_queue.put(i)
        
        for _ in range(self.num_threads):
            thread = threading.Thread(target=self.worker)
            thread.start()
            threads.append(thread)
        
        for i in range(self.num_threads):
            self.image_queue.put(None)
        print("Image processed")
        for thread in threads:
            thread.join()


def main():
    first_model_filename = "some_model_1.tflite"
    first_model_artifact = "some_model_1_artifacts"

    second_model_filename = "some_model_2.tflite"
    second_model_artifact = "some_model_2_artifacts"
    
    image = ImageProcessor(first_model_filename, first_model_artifact)
    image_1 = ImageProcessor(second_model_filename, second_model_artifact)
    
    image.process_image()
    image_1.process_image()
    
    for thread in threading.enumerate():
        print(thread.name)

    return "Successful"

if __name__ == "__main__":
    status = main()
    print(status)