This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

attaching simple message to queue



Hi

 I am trying to add an array to a message  that is going to be passed to a queue.

I am using  a template for messageQ under IPC in new->CCS Project-> IPCand I/O examples->C6678 Examples-> MesageQ

I just modified the message to pass an array of 100 elements. In core 0 I initialize the array to 1

and it suppose that in every core every element in array must be incremented by one, so I expect

at end of process to have a value of 8 in every element of the array.

 I am using to define the message and allocation:

typedef struct MyMessg{
    MessageQ_Msg msg;
    int var[ARRAY_SIZE];
}MyMsg;

 msg = (MyMessage)MessageQ_alloc(HEAPID, sizeof(MyMessage));
 if (msg == NULL) {
     System_abort("MessageQ_alloc failed\n" );
 }

Before MessageQ_put in core 0 I modify the array as follows:

            for(i=0;i<ARRAY_SIZE;i++){
                (*msg).var[i]=1;
            }

and in cores 1-7, just after MessageQ_get:

            for(i=0;i<ARRAY_SIZE;i++){
                (*msg).var[i]=(*msg).var[i] + 1;
            }

the result of running this code is that some values

of (*msg).var are set to 8 , about 8 first values, but

others are set to one.

I attach the project for clarity.

1680.test_template_messageq.zip

Is this the correct way to add information to message?

Julian

  • Hi,

    I think that your problem might be caused by cache coherency.CPU 0 changes the value of your array but if it's cached it must be written back to the shared memory so CPU1 can access the correct values.

    Try putting

    Cache_wbInv(msg->var, ARRAY_SIZE, Cache_Type_ALL, 4);

    before MessageQ_put in core 0.

    In  core 1 do:

    Cache_inv(msg->var, ARRAY_SIZE, Cache_Type_ALL, 4);

    after MessageQ_get.

    Also read these two articles about cache (pt1 and pt2).

    BR

  • Julian,

    The attached code does similiar thing as what you are trying to do.  I only tried the array size to 20.  It should work for bigger sizes.  Hope it helps.

    Xiaohui

    /* 
     * Copyright (c) 2012, Texas Instruments Incorporated
     * All rights reserved.
     *
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions
     * are met:
     *
     * *  Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     * *  Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the distribution.
     *
     * *  Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
     * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
     * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
     * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
     * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
     * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     * */
    /*
     *  ======== message_multicore.c ========
     *  Multiprocessor MessageQ example
     *
     *  This is an example program that uses MessageQ to pass a message
     *  from one processor to another.
     *
     *  Each processor creates its own MessageQ first and then will try to open
     *  a remote processor's MessageQ.  
     *
     *  See message_multicore.k file for expected output.
     */
    
    #include <xdc/std.h>
    #include <string.h>
    
    /*  -----------------------------------XDC.RUNTIME module Headers    */
    #include <xdc/runtime/System.h>
    #include <xdc/runtime/IHeap.h>
    
    /*  ----------------------------------- IPC module Headers           */
    #include <ti/ipc/Ipc.h>
    #include <ti/ipc/MessageQ.h>
    #include <ti/ipc/HeapBufMP.h>
    #include <ti/ipc/MultiProc.h>
    
    /*  ----------------------------------- BIOS6 module Headers         */
    #include <ti/sysbios/BIOS.h>
    #include <ti/sysbios/knl/Task.h>
    
    /*  ----------------------------------- To get globals from .cfg Header */
    #include <xdc/cfg/global.h>
    
    #define HEAP_NAME   "myHeapBuf"
    #define HEAPID      0
    #define NUMLOOPS    10
    
    Char localQueueName[10];
    Char nextQueueName[10];
    UInt16 nextProcId;
    
    #define ARRAY_SIZE 20
    typedef struct MyMessg{
    	MessageQ_MsgHeader header;
    	int var[ARRAY_SIZE];
    }MyMsg;
    
    
    /*
     *  ======== tsk0_func ========
     *  Allocates a message and ping-pongs the message around the processors.
     *  A local message queue is created and a remote message queue is opened.
     *  Messages are sent to the remote message queue and retrieved from the
     *  local MessageQ.
     */
    Void tsk0_func(UArg arg0, UArg arg1)
    {
        MyMsg           *msg;
        MessageQ_Handle  messageQ;
        MessageQ_QueueId remoteQueueId;    
        Int              status;
        UInt16           msgId = 0;
        HeapBufMP_Handle              heapHandle;
        HeapBufMP_Params              heapBufParams;
        Int              i;
    
        if (MultiProc_self() == 0) {
            /* 
             *  Create the heap that will be used to allocate messages.
             */     
            HeapBufMP_Params_init(&heapBufParams);
            heapBufParams.regionId       = 0;
            heapBufParams.name           = HEAP_NAME;
            heapBufParams.numBlocks      = 1;
            heapBufParams.blockSize      = sizeof(MessageQ_MsgHeader);
            heapHandle = HeapBufMP_create(&heapBufParams);
            if (heapHandle == NULL) {
                System_abort("HeapBufMP_create failed\n" );
            }
        }
        else {
            /* Open the heap created by the other processor. Loop until opened. */
            do {
                status = HeapBufMP_open(HEAP_NAME, &heapHandle);
                /* 
                 *  Sleep for 1 clock tick to avoid inundating remote processor
                 *  with interrupts if open failed
                 */
                if (status < 0) { 
                    Task_sleep(1);
                }
            } while (status < 0);
        }
        
        /* Register this heap with MessageQ */
        MessageQ_registerHeap((IHeap_Handle)heapHandle, HEAPID);
    
        /* Create the local message queue */
        messageQ = MessageQ_create(localQueueName, NULL);    
        if (messageQ == NULL) {
            System_abort("MessageQ_create failed\n" );
        }
        
        /* Open the remote message queue. Spin until it is ready. */
        do {
            status = MessageQ_open(nextQueueName, &remoteQueueId); 
            /* 
             *  Sleep for 1 clock tick to avoid inundating remote processor
             *  with interrupts if open failed
             */
            if (status < 0) { 
                Task_sleep(1);
            }
        } while (status < 0);
        
        if (MultiProc_self() == 0) {
            /* Allocate a message to be ping-ponged around the processors */
            msg = (MyMsg *)MessageQ_alloc(HEAPID, sizeof(MyMsg));
            System_printf("MyMsg Size: %d\n", sizeof(MyMsg) );
    
            if (msg == NULL) {
               System_abort("MessageQ_alloc failed\n" );
            }
            for(i=0; i<ARRAY_SIZE; i++)
               msg->var[i]=0;
    
            /* 
             *  Send the message to the next processor and wait for a message
             *  from the previous processor.
             */
            System_printf("Start the main loop\n");
            while (msgId < NUMLOOPS) {     
                /* Increment...the remote side will check this */
                msgId++;
                MessageQ_setMsgId(msg, msgId);
                
                System_printf("Sending a message #%d to %s\n", msgId, nextQueueName);
                
                /* send the message to the remote processor */
                status = MessageQ_put(remoteQueueId, (MessageQ_Msg)msg);
                if (status < 0) {
                   System_abort("MessageQ_put had a failure/error\n");        
                }        
                
                /* Get a message */
                status = MessageQ_get(messageQ, (MessageQ_Msg *)&msg, MessageQ_FOREVER);
                if (status < 0) {
                   System_abort("This should not happen since timeout is forever\n");
                }
            }
        }
        else {
            /*
             *  Wait for a message from the previous processor and
             *  send it to the next processor
             */
            System_printf("Start the main loop\n");
            while (TRUE) {
                /* Get a message */
                status = MessageQ_get(messageQ, (MessageQ_Msg *)&msg, MessageQ_FOREVER);
                if (status < 0) {
                   System_abort("This should not happen since timeout is forever\n");
                }
                for(i=0;i<ARRAY_SIZE;i++)
                    msg->var[i]=msg->var[i] + 1;
    
                System_printf("Sending a message #%d to %s\n", MessageQ_getMsgId(msg),
                    nextQueueName);
    
                /* Get the message id */
                msgId = MessageQ_getMsgId(msg);
    
                /* send the message to the remote processor */
                status = MessageQ_put(remoteQueueId, (MessageQ_Msg)msg);  
                if (status < 0) {
                   System_abort("MessageQ_put had a failure/error\n");
                }
                
                /* test done */
                if (msgId >= NUMLOOPS) {
                    break;
                }
            }
        }
        
        System_printf("The test is complete\n");
        BIOS_exit(0);
    }
    
    /*
     *  ======== main ========
     *  Synchronizes all processors (in Ipc_start) and calls BIOS_start
     */
    Int main(Int argc, Char* argv[])
    {
        Int status;
    
        nextProcId = (MultiProc_self() + 1) % MultiProc_getNumProcessors();
        
        /* Generate queue names based on own proc ID and total number of procs */
        System_sprintf(localQueueName, "%s", MultiProc_getName(MultiProc_self()));
        System_sprintf(nextQueueName, "%s",  MultiProc_getName(nextProcId));
        
        /*  
         *  Ipc_start() calls Ipc_attach() to synchronize all remote processors
         *  because 'Ipc.procSync' is set to 'Ipc.ProcSync_ALL' in *.cfg
         */
        status = Ipc_start();
        if (status < 0) {
            System_abort("Ipc_start failed\n");
        }
     
        BIOS_start();
    
        return (0);
    }
    /*
     *  @(#) ti.sdo.ipc.examples.multicore.evm667x; 1, 0, 0, 0,1; 5-22-2012 16:36:06; /db/vtree/library/trees/ipc/ipc-h32/src/ xlibrary
    
     */
    
    

     

  • Johns_ and Xiaohui

    Thanks for reply.

    Johns_:

    I follow your advice adding cache functions but when I use step over cache_inv function in core1 the message did not

    load values from core 0.

    I see in image_processing demo  the use of cache functions so I am going to read the articles you

    recommend me because   I see the use of this memory improve system performance.

    Xiaohui:

    I run the file you attached and I was able to  change array size to more than 20 changing

    the following line:

           heapBufParams.blockSize      = sizeof(MessageQ_MsgHeader);


            heapBufParams.blockSize      = sizeof(MyMsg);

    I found the error:   I was using a pointer to the message and I was calling MessageQ_alloc with

    that pointer, so I always was getting message size of 4 and my message was bigger than that.

    Thank you very much.

    Julian