TDA4AL-Q1: When do nodes get blocked

Part Number: TDA4AL-Q1

Tool/software:

I am learning OpenVX through the TI's implementation for TDA4AL processor. While I am running the code in host emulation mode, I would like to understand the concept of blocked nodes. I tried the following two cases to understand the blocked nodes, but failed.

1. Deleting references of variables before they could be accessed by deeper nodes: I used a graph pipeline consisting of three nodes (NODE 1, NODE 2, NODE 3), which are connected through vx_image objects and each sleep for 1, 2 and 3 seconds respectively. I enqueued the image object at graph parameter 0, associated with NODE 1 and immediately deleted the references associated with the image objects. This did not stop the execution of the graph pipeline. Can you help me understand why the graph The code is attached below.

#include <stdio.h>
#include <unistd.h>
#include <VX/vx.h>
#include <VX/vx_khr_pipelining.h>
#include <TI/tivx.h>
#include <utility.h>
#include <tivx_openvx_core_kernels.h>


#define IN0_IMG_IDX (0u)
#define SLEEP_PARAM_IDX (1u)
#define OUT0_IMG_IDX (2u)
#define MAX_PARAMS (3u)
#define BUF_SIZE (2u)


static vx_enum kernel_id = (vx_status)VX_ERROR_INVALID_PARAMETERS;
static vx_kernel g_kernel = NULL;


static vx_status VX_CALLBACK kernel_init(vx_node node, vx_reference parameters[], vx_int32 num){
    // printf(" vx_custom_node_1: init SUCCESS ... \n");
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_run(vx_node node, vx_reference parameters[], vx_uint32 num){
    // vx_image in_image = (vx_image)parameters[IN0_IMG_IDX];
    // vx_image out_image = (vx_image)parameters[OUT0_IMG_IDX];
    vx_scalar sleep_time = (vx_scalar)parameters[SLEEP_PARAM_IDX];
    int t;
    vxCopyScalar(sleep_time, &t, VX_READ_ONLY, VX_MEMORY_TYPE_HOST);
    printf(" \t[ %3.4f ] sleeping for %d seconds \n", tivxPlatformGetTimeInUsecs()/1000000.0, t);
    sleep(t);
    printf(" \t[ %3.4f ] done sleeping for %d seconds \n", tivxPlatformGetTimeInUsecs()/1000000.0, t);
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_deinit(vx_node node, vx_reference parameters[], vx_uint32 num){
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_validate(vx_node node,
        const vx_reference parameters[ ],
        vx_uint32 num, vx_meta_format metas[]){
    return VX_SUCCESS;
}

static vx_node get_kernel_node(vx_graph graph, vx_image in, vx_scalar sleep_time, vx_image out){
    vx_node node;
    vx_reference refs[] = {(vx_reference)in, (vx_reference)sleep_time, (vx_reference)out};
    
    node = tivxCreateNodeByKernelEnum(graph,kernel_id,refs,3);
    vxSetReferenceName((vx_reference)node, "NODE");
    return node;
}

static vx_status kernel_create(vx_context context){
    vx_kernel kernel = NULL;
    vx_status status;
    int index=0;

    status = vxAllocateUserKernelId(context, &kernel_id);
    kernel = vxAddUserKernel(
        context,
        "kernel_1",
        kernel_id,
        kernel_run,
        3,
        kernel_validate,
        kernel_init,
        kernel_deinit
    );

    tivxKernelsHostUtilsAddKernelTargetDsp(kernel);
    tivxAddKernelTarget(kernel, TIVX_TARGET_MPU_3);

    // tivxAddKernelTarget(kernel, TIVX_TARGET_DSP1);
    // #ifndef SOC_AM62A
    // tivxAddKernelTarget(kernel, TIVX_TARGET_DSP2);
    // #endif
    // tivxAddKernelTarget(kernel, TIVX_CPU_ID_MPU_0);

    status = vxGetStatus((vx_reference)kernel);

    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_INPUT,
            (vx_enum)VX_TYPE_IMAGE,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding input image parameter \n");
    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_INPUT,
            (vx_enum)VX_TYPE_SCALAR,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding input int parameter \n");
    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_OUTPUT,
            (vx_enum)VX_TYPE_IMAGE,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding output image parameter \n");
    if(status == VX_SUCCESS){
        status = vxFinalizeKernel(kernel);
    }
    if(status != VX_SUCCESS){
        vxReleaseKernel(&kernel);
        kernel = NULL;
    }else{
        g_kernel = kernel;
    }
}

static vx_status kernel_remove(vx_context context){
    vx_status status;
    status = vxRemoveKernel(g_kernel);
    g_kernel = NULL;

    return status;
}


static void add_graph_parameter_by_node_index(vx_graph graph, vx_node node, vx_uint32 node_parameter_index)
{
    vx_parameter parameter = vxGetParameterByIndex(node, node_parameter_index);

    vxAddParameterToGraph(graph, parameter);
    vxReleaseParameter(&parameter);
}

void vx_custom_pipeline(){
    vx_context context;
    vx_graph graph;
    vx_image img1[BUF_SIZE], img2, img3, img4[BUF_SIZE];
    vx_node node_1, node_2, node_3;
    vx_uint32 width=640, height=480;
    vx_status status;

    vx_uint32 num_buf=BUF_SIZE, pipeline_depth=1, buf_id, loop_id, loop_cnt=2, exe_time;
    vx_graph_parameter_queue_params_t graph_params_list[2];

    printf(" Tutorial started !!! \n");

    context = vxCreateContext();

    status = kernel_create(context);
    printf(" done creating kernel, kernel_id = %d \n", kernel_id);
    
    for(int i=0;i<num_buf;i++){
        img1[i] = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
        img4[i] = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
    }

    img2 = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
    img3 = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);

    printf(" done initializing image variables \n");
    
    graph = vxCreateGraph(context);
    vx_int32 tmp = 1;
    vx_scalar sleep_time_1 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);
    tmp = 2;
    vx_scalar sleep_time_2 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);
    tmp = 3;
    vx_scalar sleep_time_3 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);

    node_1 = get_kernel_node(graph, img1[0], sleep_time_1, img2);
    node_2 = get_kernel_node(graph, img2, sleep_time_2, img3);
    node_3 = get_kernel_node(graph, img3, sleep_time_3, img4[0]);

    printf(" node creation done \n");

    vxSetNodeTarget(node_1, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    vxSetNodeTarget(node_2, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    vxSetNodeTarget(node_3, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    // TIVX_TARGET_DSP_C7_1

    add_graph_parameter_by_node_index(graph, node_1, 0);
    add_graph_parameter_by_node_index(graph, node_3, 2);

    graph_params_list[0].graph_parameter_index = 0;
    graph_params_list[0].refs_list_size = num_buf;
    graph_params_list[0].refs_list = (vx_reference *)&img1[0];

    graph_params_list[1].graph_parameter_index = 1;
    graph_params_list[1].refs_list_size = num_buf;
    graph_params_list[1].refs_list = (vx_reference *)&img4[0];

    vxSetGraphScheduleConfig(
        graph,
        (vx_enum)VX_GRAPH_SCHEDULE_MODE_QUEUE_AUTO,
        2,
        graph_params_list
    );

    tivxSetGraphPipelineDepth(graph, pipeline_depth);
    tivxSetNodeParameterNumBufByIndex(node_1, 2, num_buf);
    tivxSetNodeParameterNumBufByIndex(node_2, 2, num_buf);

    status = vxVerifyGraph(graph);

    printf("\n================= Verify graph done =================\n\n");

    exe_time = tivxPlatformGetTimeInUsecs();

    vx_image cur_out_img, cur_in_img;
    uint32_t num_refs;

    /* Running the loop */
    for(buf_id=0;buf_id<BUF_SIZE;buf_id++){
        vxGraphParameterEnqueueReadyRef(graph, 0, (vx_reference *)&img1[buf_id], 1);
        vxGraphParameterEnqueueReadyRef(graph, 1, (vx_reference *)&img4[buf_id], 1);
        for(int i=0;i<num_buf;i++){
            vxReleaseImage(&img1[i]);
            vxReleaseImage(&img4[i]);
        }
        vxReleaseImage(&img2);
        vxReleaseImage(&img3);
        vxReleaseScalar(&sleep_time_1);
        vxReleaseScalar(&sleep_time_2);
        vxReleaseScalar(&sleep_time_3);
    }

    vxWaitGraph(graph);

    exe_time = tivxPlatformGetTimeInUsecs() - exe_time;
    printf(" execution time in seconds: %d \n", exe_time/1000000);

    for(int i=0;i<num_buf;i++){
        vxReleaseImage(&img1[i]);
        vxReleaseImage(&img4[i]);
    }
    vxReleaseImage(&img2);
    vxReleaseImage(&img3);
    vxReleaseScalar(&sleep_time_1);
    vxReleaseScalar(&sleep_time_2);
    vxReleaseScalar(&sleep_time_3);
    vxReleaseNode(&node_1);
    vxReleaseNode(&node_2);
    vxReleaseNode(&node_3);
    vxReleaseGraph(&graph);
    kernel_remove(context);
    vxReleaseContext(&context);

    printf(" Tutorial completed !!! \n");
}

2. In the above graph pipeline, I added another node, NODE 4, that takes the output from NODE 1 and sleeps for 5 seconds. Additionaly, I set the target of the newly added node to be different than NODE 2. I expect the pipeline to take the same data output from NODE 1 and simultaneously execute NODE 2 and NODE 4. If this happens, both the nodes 2 and 4 tries to fetch data from the same data reference queue that is output by NODE 1, and could lead to one of the nodes being blocked. Even in this case, none of the nodes are being blocked. The code is attached. 

#include <stdio.h>
#include <unistd.h>
#include <VX/vx.h>
#include <VX/vx_khr_pipelining.h>
#include <TI/tivx.h>
#include <utility.h>
#include <tivx_openvx_core_kernels.h>


#define IN0_IMG_IDX (0u)
#define SLEEP_PARAM_IDX (1u)
#define OUT0_IMG_IDX (2u)
#define MAX_PARAMS (3u)
#define BUF_SIZE (2u)


static vx_enum kernel_id = (vx_status)VX_ERROR_INVALID_PARAMETERS;
static vx_kernel g_kernel = NULL;


static vx_status VX_CALLBACK kernel_init(vx_node node, vx_reference parameters[], vx_int32 num){
    // printf(" vx_custom_node_1: init SUCCESS ... \n");
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_run(vx_node node, vx_reference parameters[], vx_uint32 num){
    // vx_image in_image = (vx_image)parameters[IN0_IMG_IDX];
    // vx_image out_image = (vx_image)parameters[OUT0_IMG_IDX];
    vx_scalar sleep_time = (vx_scalar)parameters[SLEEP_PARAM_IDX];
    int t;
    vxCopyScalar(sleep_time, &t, VX_READ_ONLY, VX_MEMORY_TYPE_HOST);
    printf(" \t[ %3.4f ] sleeping for %d seconds \n", tivxPlatformGetTimeInUsecs()/1000000.0, t);
    sleep(t);
    printf(" \t[ %3.4f ] done sleeping for %d seconds \n", tivxPlatformGetTimeInUsecs()/1000000.0, t);
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_deinit(vx_node node, vx_reference parameters[], vx_uint32 num){
    return VX_SUCCESS;
}

static vx_status VX_CALLBACK kernel_validate(vx_node node,
        const vx_reference parameters[ ],
        vx_uint32 num, vx_meta_format metas[]){
    return VX_SUCCESS;
}

static vx_node get_kernel_node(vx_graph graph, vx_image in, vx_scalar sleep_time, vx_image out){
    vx_node node;
    vx_reference refs[] = {(vx_reference)in, (vx_reference)sleep_time, (vx_reference)out};
    
    node = tivxCreateNodeByKernelEnum(graph,kernel_id,refs,3);
    vxSetReferenceName((vx_reference)node, "NODE");
    return node;
}

static vx_status kernel_create(vx_context context){
    vx_kernel kernel = NULL;
    vx_status status;
    int index=0;

    status = vxAllocateUserKernelId(context, &kernel_id);
    kernel = vxAddUserKernel(
        context,
        "kernel_1",
        kernel_id,
        kernel_run,
        3,
        kernel_validate,
        kernel_init,
        kernel_deinit
    );

    tivxKernelsHostUtilsAddKernelTargetDsp(kernel);
    tivxAddKernelTarget(kernel, TIVX_TARGET_MPU_3);

    // tivxAddKernelTarget(kernel, TIVX_TARGET_DSP1);
    // #ifndef SOC_AM62A
    // tivxAddKernelTarget(kernel, TIVX_TARGET_DSP2);
    // #endif
    // tivxAddKernelTarget(kernel, TIVX_CPU_ID_MPU_0);

    status = vxGetStatus((vx_reference)kernel);

    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_INPUT,
            (vx_enum)VX_TYPE_IMAGE,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding input image parameter \n");
    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_INPUT,
            (vx_enum)VX_TYPE_SCALAR,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding input int parameter \n");
    if(status==VX_SUCCESS){
        status = vxAddParameterToKernel(
            kernel,
            index++,
            (vx_enum)VX_OUTPUT,
            (vx_enum)VX_TYPE_IMAGE,
            (vx_enum)VX_PARAMETER_STATE_REQUIRED
        );
    }
    printf(" done adding output image parameter \n");
    if(status == VX_SUCCESS){
        status = vxFinalizeKernel(kernel);
    }
    if(status != VX_SUCCESS){
        vxReleaseKernel(&kernel);
        kernel = NULL;
    }else{
        g_kernel = kernel;
    }
}

static vx_status kernel_remove(vx_context context){
    vx_status status;
    status = vxRemoveKernel(g_kernel);
    g_kernel = NULL;

    return status;
}


static void add_graph_parameter_by_node_index(vx_graph graph, vx_node node, vx_uint32 node_parameter_index)
{
    vx_parameter parameter = vxGetParameterByIndex(node, node_parameter_index);

    vxAddParameterToGraph(graph, parameter);
    vxReleaseParameter(&parameter);
}

void vx_custom_pipeline_split_data_q(){
    vx_context context;
    vx_graph graph;
    vx_image img1[BUF_SIZE], img2, img3, img4[BUF_SIZE], img5;
    vx_node node_1, node_2, node_3, node_4;
    vx_uint32 width=640, height=480;
    vx_status status;

    vx_uint32 num_buf=BUF_SIZE, pipeline_depth=1, buf_id, loop_id, loop_cnt=2, exe_time;
    vx_graph_parameter_queue_params_t graph_params_list[2];

    printf(" Tutorial started !!! \n");

    context = vxCreateContext();

    status = kernel_create(context);
    printf(" done creating kernel, kernel_id = %d \n", kernel_id);
    
    for(int i=0;i<num_buf;i++){
        img1[i] = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
        img4[i] = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
    }

    img2 = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
    img3 = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);
    img5 = vxCreateImage(context, width, height, (vx_df_image)VX_DF_IMAGE_U8);

    printf(" done initializing image variables \n");
    
    graph = vxCreateGraph(context);
    vx_int32 tmp = 1;
    vx_scalar sleep_time_1 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);
    tmp = 2;
    vx_scalar sleep_time_2 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);
    tmp = 3;
    vx_scalar sleep_time_3 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);
    tmp = 10;
    vx_scalar sleep_time_4 = vxCreateScalar(context, VX_TYPE_INT32, &tmp);

    node_1 = get_kernel_node(graph, img1[0], sleep_time_1, img2);
    node_2 = get_kernel_node(graph, img2, sleep_time_2, img3);
    node_3 = get_kernel_node(graph, img3, sleep_time_3, img4[0]);
    node_4 = get_kernel_node(graph, img2, sleep_time_4, img5);

    printf(" node creation done \n");

    vxSetNodeTarget(node_1, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    vxSetNodeTarget(node_2, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    vxSetNodeTarget(node_3, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_MPU_3);
    vxSetNodeTarget(node_4, (vx_enum)VX_TARGET_STRING, TIVX_TARGET_DSP_C7_1);
    // TIVX_TARGET_DSP_C7_1

    add_graph_parameter_by_node_index(graph, node_1, 0);
    add_graph_parameter_by_node_index(graph, node_3, 2);

    graph_params_list[0].graph_parameter_index = 0;
    graph_params_list[0].refs_list_size = num_buf;
    graph_params_list[0].refs_list = (vx_reference *)&img1[0];

    graph_params_list[1].graph_parameter_index = 1;
    graph_params_list[1].refs_list_size = num_buf;
    graph_params_list[1].refs_list = (vx_reference *)&img4[0];

    vxSetGraphScheduleConfig(
        graph,
        (vx_enum)VX_GRAPH_SCHEDULE_MODE_QUEUE_AUTO,
        2,
        graph_params_list
    );

    tivxSetGraphPipelineDepth(graph, pipeline_depth);
    tivxSetNodeParameterNumBufByIndex(node_1, 2, num_buf);
    tivxSetNodeParameterNumBufByIndex(node_2, 2, num_buf);
    tivxSetNodeParameterNumBufByIndex(node_4, 2, num_buf);

    status = vxVerifyGraph(graph);

    printf("\n================= Verify graph done =================\n\n");

    exe_time = tivxPlatformGetTimeInUsecs();

    vx_image cur_out_img, cur_in_img;
    uint32_t num_refs;

    /* Running the loop */
    for(buf_id=0;buf_id<BUF_SIZE;buf_id++){
        vxGraphParameterEnqueueReadyRef(graph, 0, (vx_reference *)&img1[buf_id], 1);
        vxGraphParameterEnqueueReadyRef(graph, 1, (vx_reference *)&img4[buf_id], 1);
    }

    for(loop_id=0;loop_id<loop_cnt;loop_id++){
        vx_image cur_out_img=img4[0], cur_in_img=img1[0];
        uint32_t num_refs;
        vxGraphParameterDequeueDoneRef(graph, 0, (vx_reference *)&cur_in_img, 1, &num_refs);
        vxGraphParameterDequeueDoneRef(graph, 1, (vx_reference *)&cur_out_img, 1, &num_refs);
        // printf(" dequeing done \n");

        // Use the output

        if(loop_id>=loop_cnt-num_buf)continue;
        vxGraphParameterEnqueueReadyRef(graph, 0, (vx_reference *)&cur_in_img, 1);
        vxGraphParameterEnqueueReadyRef(graph, 1, (vx_reference *)&cur_out_img, 1);
    }

    vxWaitGraph(graph);

    exe_time = tivxPlatformGetTimeInUsecs() - exe_time;
    printf(" execution time in seconds: %d \n", exe_time/1000000);

    for(int i=0;i<num_buf;i++){
        vxReleaseImage(&img1[i]);
        vxReleaseImage(&img4[i]);
    }
    vxReleaseImage(&img2);
    vxReleaseImage(&img3);
    vxReleaseScalar(&sleep_time_1);
    vxReleaseScalar(&sleep_time_2);
    vxReleaseScalar(&sleep_time_3);
    vxReleaseNode(&node_1);
    vxReleaseNode(&node_2);
    vxReleaseNode(&node_3);
    vxReleaseGraph(&graph);
    kernel_remove(context);
    vxReleaseContext(&context);

    printf(" Tutorial completed !!! \n");
}

Could anyone help me understand why is the application behaving in this way in both these cases?

Also please suggest me a usecase that could lead to blocked nodes.

Thanks in advance!