Hi, I am trying to run multiple models with multi-threading(python) with edge ai tidl tflite runtime.
I have attached a sample code (that mimicks our scenario quite closely). If I run this code it freezes (there is no log of APP Deinit). However, if I run this code without instantiating the second model it works fine(APP Deinit etc is logged on terminal).
You can choose and model and correpsonding artifacts as you see suitable. I could replicate this behavior with every model. Unfortunately I cannot share models.
I suspect there is some shared resource which is not released and hence the code freezes. I would like to understand is there a better way to handle multiple models and multi-threading scenario?
Looking forward to your response.
Best
Ashay
import os
import numpy as np
import threading
import queue
import time
import copy
import tflite_runtime.interpreter as tflite
os.environ["SOC"]="am68a"
os.environ["TIDL_TOOLS_PATH"]="/home/root/edgeai-tidl-tools/"
def get_image():
img = np.random.randint(0, 255, (1, 256, 320, 3))
return img.astype(np.float32)
class ImageProcessor:
def __init__(self, model_path, model_artifacts, num_threads=2):
self.model_path = model_path
self.model_artifacts = model_artifacts
self.compile_options = {"tidl_tools_path": os.environ["TIDL_TOOLS_PATH"],
"artifacts_folder": model_artifacts}
self.shared_obj_path = "libtidl_tfl_delegate.so"
self.tidl_delegate = [tflite.load_delegate(self.shared_obj_path, self.compile_options)]
self.interpreter = tflite.Interpreter(model_path=self.model_path,
experimental_delegates=self.tidl_delegate)
self.image_queue = queue.Queue()
self.lock = threading.Lock()
self.num_threads = 2
def run_inference(self, input_image):
input_details = self.interpreter.get_input_details()
output_details = self.interpreter.get_output_details()
self.interpreter.resize_tensor_input(0, (1, 256, 320, 3))
self.interpreter.allocate_tensors()
self.interpreter.set_tensor(input_details[0]['index'], input_image)
self.interpreter.invoke()
output = self.interpreter.get_tensor(output_details[0]['index'])
return output
def worker(self):
while True:
images = self.image_queue.get()
if images is None:
break
with self.lock:
output = self.run_inference(images)
def process_image(self):
images = []
threads = []
for i in range(10):
images.append(get_image())
for i in images:
self.image_queue.put(i)
for _ in range(self.num_threads):
thread = threading.Thread(target=self.worker)
thread.start()
threads.append(thread)
for i in range(self.num_threads):
self.image_queue.put(None)
print("Image processed")
for thread in threads:
thread.join()
def main():
first_model_filename = "some_model_1.tflite"
first_model_artifact = "some_model_1_artifacts"
second_model_filename = "some_model_2.tflite"
second_model_artifact = "some_model_2_artifacts"
image = ImageProcessor(first_model_filename, first_model_artifact)
image_1 = ImageProcessor(second_model_filename, second_model_artifact)
image.process_image()
image_1.process_image()
for thread in threading.enumerate():
print(thread.name)
return "Successful"
if __name__ == "__main__":
status = main()
print(status)