This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

RTOS/TMS320C6678: MessageQ Latency

Part Number: TMS320C6678

Tool/software: TI-RTOS

Hi all,

I am working on a benchmark for the MessageQ library. Core 0 is the master and sends out messages to all other cores to do some work. The slaves answer the master with some acknowledgement when they did their work. The MessageQ approach was suggested to me in this post:
https://e2e.ti.com/support/processors/f/791/p/813105/3010666#3010666

I started with the example provided in ti/ipc_3_50_02_02/examples/C6678_bios_elf/ex11_ping but slightly modified it to fit my purpose. For the purpose of prototyping I'm currently creating the same image for all cores and switching take actions for master/slaves based upon the core ID. The only variable I added to the message structure is an enum defining a message type. I went through all suggestions listed in http://processors.wiki.ti.com/index.php/IPC_Users_Guide/Optimizing_IPC_Applications

e.g. disabled all run-time asserts, chose BIOS.LibType_Custom, selected the ti.sdo.ipc.family.c647x.NotifyCircSetup etc. I'm compiling with -O3 and all debug information disabled. I'm using Ipc.ProcSync_PAIR and attaching all slaves to the master and the master to all slaves.

I now measured the number of cycles it takes for (all messages allocated previously and freed afterwards):

- Master sending one message to all cores

- Master receiving the acknowledgement from all slaves

It takes ~63000 cycles to do this exercise (second run)! However, the work all slaves are doing is expected to be between 4000 to 20,000 cycles. I expected the MessageQ approach to be much faster as I'm barely sending any data around.

My questions:

- Is this time expected?

- How can I improve the performance?

- Is there any benchmark available for the master-slave approach?

It would be great if you could give me a few more additional hints or point me to some suggestions. I also carefully pasted all my playground-code into one file and attached it including the cfg file.

Thank you very much for your help.

#include <xdc/std.h>
#include <xdc/cfg/global.h>

#include <xdc/runtime/System.h>
#include <xdc/runtime/IHeap.h>

#include <ti/ipc/SharedRegion.h>
#include <ti/ipc/Notify.h>
#include <ti/ipc/Ipc.h>
#include <ti/ipc/MultiProc.h>
#include <ti/ipc/MessageQ.h>
#include <ti/ipc/SharedRegion.h>
#include <ti/platform/platform.h>
#include <ti/sysbios/BIOS.h>
#include <ti/csl/csl_chip.h>

#include <stdlib.h>
#include <c6x.h>
#include <stdio.h>

//#include "../include/Ipc.h"
//#include "../include/Ipc_common.h"

#define DEBUGLEVEL 1
#define PROFILING
//#define ALLOCMSG       // allocate a new message on each MessageQ_put
#define MAX_NUM_CORES			maxNumCores
#define NUM_SLAVES              7
#define NUM_MESSAGES			16
#define MASTER_CORE				0
#define TOKEN_START_CORE		0
#define HEAP_ID					0

/* Define custom message structure */
typedef enum {
    MSG_TOKEN = 0,
    MSG_ACK   = 1,
    MSG_DONE  = 2
}msgType;

typedef struct myMsg {
    MessageQ_MsgHeader header;
    msgType messageType;
}myMsg;

/* Global Variables */
Char localQueueName[10];
Char remoteQueueName[10];
Char msgQueueNames[MAX_NUM_CORES][10];
UInt numCores = 0;
Uint16 selfId;
const Uint16 masterId = 0;
MessageQ_Handle messageQ = NULL;

/* MessageQ Utils */
void receive_acknowledgement(void);
void send_msg_to_slaves(MessageQ_Msg *msg, MessageQ_QueueId *msgQueueIds);
void set_msg_type(MessageQ_Msg *msg, msgType message_type);

/* IPC Utils */
void attach_master();
void attach_slave();

/* Other Utils */
void init_timer(void);
void print_timing_info(const int n_ops_total, const long int elapsed_ticks, const char* name);


/******************************************************************************
 * MAIN FUNCTION
 *****************************************************************************/
Int main(Int argc, Char* argv[]) {
	selfId = CSL_chipReadReg(CSL_CHIP_DNUM);
	if (numCores == 0){
		numCores = MultiProc_getNumProcessors();
#ifdef ASSERTS
		assert(numCores == MAX_NUM_CORES);
#endif
	}

	if (selfId > 0) {
	    attach_slave();
	} else {
	    attach_master();
	}

	/* Create a MessageQ */
#if DEBUGLEVEL >= 4
	System_printf("Creating a local queue for core id %d\n", selfId);
#endif
	System_sprintf(localQueueName, "%s", MultiProc_getName(MultiProc_self()));// this ensure that we have a unique Q name
	messageQ = MessageQ_create(localQueueName, NULL);
	if (messageQ == NULL){
		System_abort("MessageQ_create failed\n");
	}

	BIOS_start();

	return (0);
}

/******************************************************************************
 * TASK FUNCTION
 *****************************************************************************/
void task_fxn(UArg arg0, UArg arg1){
	Int 				status;
	Int 				coreCount;
	MessageQ_Msg 		msg;
	MessageQ_QueueId 	msgQueueIds[MAX_NUM_CORES];

	/* Register this heap with the Message Q */
	MessageQ_registerHeap((IHeap_Handle)SharedRegion_getHeap((UInt16)0), HEAP_ID);

	if (selfId == masterId) {
	    /* Master: open a MsgQueue on each slave core for sending requests */
	    for (coreCount = 1; coreCount < MAX_NUM_CORES; coreCount++) {
            System_sprintf(remoteQueueName, "%s", MultiProc_getName((UInt16)coreCount));
            do {
                status = MessageQ_open(remoteQueueName, &msgQueueIds[coreCount]);
                if (status < 0) {
                    Task_sleep(1);
                }
            } while (status < 0);
	    }
	} else {
	    /* Slave: open a MsgQueue on master core for sending acknowledgement */
        System_sprintf(remoteQueueName, "%s", MultiProc_getName((UInt16)0));
        do {
            status = MessageQ_open(remoteQueueName, &msgQueueIds[masterId]);
            if (status < 0){
                Task_sleep(1);
            }
        } while (status < 0);
	}

    if (selfId == masterId) {
#ifdef PROFILING
        int t0, t1;
        init_timer();
#endif
        /* Allocate messages to all slaves once */
        MessageQ_Msg msg_array[NUM_SLAVES];
        int i;
        for (i = 0; i < NUM_SLAVES; i++) {
            msg_array[i] = MessageQ_alloc(HEAP_ID, sizeof(myMsg));
            if (msg_array[i] == NULL){
                System_abort("MessageQ Alloc Failed\n");
            }
        }
#if DEBUGLEVEL >= 1
        System_printf("Master: Starting message round 1\n");
#endif
        /* Send a MSG_TOKEN to all slave cores */
        set_msg_type(msg_array, MSG_TOKEN);
#ifdef PROFILING
        t0 = _itoll(TSCH, TSCL);
#endif
        send_msg_to_slaves(msg_array, msgQueueIds);
        receive_acknowledgement();
#ifdef PROFILING
        t1 = _itoll(TSCH, TSCL);
        print_timing_info(1, (t1 - t0), "Msg sent and acknowledged 1");
#endif
#if DEBUGLEVEL >= 1
        System_printf("Master: Finished round 1 starting round 2\n");
#endif
#ifdef PROFILING
        t0 = _itoll(TSCH, TSCL);
#endif
        send_msg_to_slaves(msg_array, msgQueueIds);
        receive_acknowledgement();
#ifdef PROFILING
        t1 = _itoll(TSCH, TSCL);
        print_timing_info(1, (t1 - t0), "Msg sent and acknowledged 2");
#endif
#if DEBUGLEVEL >= 1
        System_printf("Master: Finished round 2 shutting down\n");
#endif
        /* Send a message to all slaves to shut down */
        set_msg_type(msg_array, MSG_DONE);
        send_msg_to_slaves(msg_array, msgQueueIds);
        receive_acknowledgement();
        BIOS_exit(0);

    } else {

        /* Slaves: Allocate a message for acknowledgements */
        MessageQ_Msg ack = MessageQ_alloc(HEAP_ID, sizeof(myMsg));
        if (ack==NULL){
            System_abort("MessageQ Alloc Failed\n");
        }
        ((myMsg*)ack)->messageType = MSG_ACK; // slaves only send acknowledgements
        const MessageQ_QueueId masterQueueId = msgQueueIds[masterId];
        while (TRUE){
            /* Slave: answer on every incoming messages with MSG_ACK */
            MessageQ_get(messageQ, &msg, MessageQ_FOREVER);
#if DEBUGLEVEL >= 3
            System_printf("Slave_%d: Received msgType=%d from master\n",
                          selfId, ((myMsg*)msg)->messageType);
#endif
#ifdef ASSERTS
            assert((((myMsg*)msg)->messageType == MSG_TOKEN) || (((myMsg*)msg)->messageType == MSG_DONE));
#endif
            MessageQ_put(masterQueueId, ack);
            if (((myMsg*)msg)->messageType == MSG_DONE) {
                BIOS_exit(0);
            }
        }
    }
}


/******************************************************************************
 * MessageQ Utils
 *****************************************************************************/
void send_msg_to_slaves(MessageQ_Msg *msg, MessageQ_QueueId *msgQueueIds) {
    /* Without loop: always send 7 requests */
    MessageQ_put(msgQueueIds[1], msg[0]);
    MessageQ_put(msgQueueIds[2], msg[1]);
    MessageQ_put(msgQueueIds[3], msg[2]);
    MessageQ_put(msgQueueIds[4], msg[3]);
    MessageQ_put(msgQueueIds[5], msg[4]);
    MessageQ_put(msgQueueIds[6], msg[5]);
    MessageQ_put(msgQueueIds[7], msg[6]);
}

void set_msg_type(MessageQ_Msg *msg, msgType message_type) {
    int i;
    for (i = 0; i < NUM_SLAVES; i++) {
        ((myMsg*)msg[i])->messageType = message_type;
    }
}

void receive_acknowledgement(void) {
    /* Without loop: always expect 7 answers */
    MessageQ_Msg ack;
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
    MessageQ_get(messageQ, &ack, MessageQ_FOREVER);
#ifdef ASSERTS
    assert(MessageQ_count(messageQ) == 0);
#endif
}


/******************************************************************************
 * Other Utils
 *****************************************************************************/
void init_timer() {
    TSCL = 1;
}

void print_timing_info(const int n_ops_total, const long int elapsed_ticks, const char* name) {
    const int n_cycles = (int)(elapsed_ticks);
    System_printf("%s: n cycles = %d\n%s: ops per cycle = %f (n_ops=%d)\n", name, n_cycles,
           name, (((double)n_ops_total) / ((double)n_cycles)), n_ops_total);
}


/******************************************************************************
 * IPC Utils
 *****************************************************************************/
void attach_master()
{
    SharedRegion_Entry entry;
    Int status;
    UInt i;
    UInt16 clusterBaseId = MultiProc_getBaseIdOfCluster();

    /* Call Ipc_start() */
    status = Ipc_start();
    if (status < 0) {
        System_abort("Ipc_start failed!\n");
    }

    /* get region 0 information */
    SharedRegion_getEntry(0, &entry);
    if (entry.isValid == FALSE) {
        return;
    }

    /* Must attach to owner first to get default GateMP */
    if (MultiProc_self() != entry.ownerProcId) {
        do {
            status = Ipc_attach(entry.ownerProcId);
        } while (status < 0);
    }

    /* Loop to attach to all other processors */
    for (i = clusterBaseId; i < (clusterBaseId + MAX_NUM_CORES); i++)
    {
        if ((i == MultiProc_self()) ||
             (i == entry.ownerProcId)) {

            continue;
        }

        if (Notify_numIntLines((UInt16)i) == 0) {
            continue;
        }

        /* call Ipc_attach for every remote processor */
        do {
            status = Ipc_attach((UInt16)i);
        } while (status < 0);
    }
}


void attach_slave(void)
{
    SharedRegion_Entry entry;
    Int status;
    UInt16 clusterBaseId = MultiProc_getBaseIdOfCluster();

    /* Call Ipc_start() */
    status = Ipc_start();
    if (status < 0) {
        System_abort("Ipc_start failed!\n");
    }

    /* get region 0 information */
    SharedRegion_getEntry(0, &entry);
    if (entry.isValid == FALSE) {
        return;
    }

    /* Must attach to owner first to get default GateMP */
    if (MultiProc_self() == entry.ownerProcId) {
        System_abort("Slave is owner for SharedRegion");
    }

    /* Attach to master */
    do {
        status = Ipc_attach((UInt16)masterId);
    } while (status < 0);
}
IpcSharedMem.cfg