This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

AM2634: IPC hangs on callback

Part Number: AM2634
Other Parts Discussed in Thread: SYSCONFIG

I'm using IPC with SDK 8.6 with syscfg 1.19.0.

My code stucks in IpcNotify_isr() when there is a lot of IPC traffic. The RPMessage_notifyCallback() is trying to handle messages from the RxBuf but RPMessage_vringIsFullRxBuf always returns true but inside RPMessage_recvHandler the RPMessage_allocEndPtMsg always return null. What is causing this endless loop?

We are using 2 lockstep cores and this is the syscfg configuration for IPC:

Thanks

Alessandro

  • Hello,

    Please check Ipc_rpmsg_echo example in sdk_examples\drivers\ipc\ipc_rpmsg_echo. It is sending 100000u messages.
    How many messages did cause this issue?

    Best Regards,
    Gunjan

  • We used the example as a reference for our design. The problem is not the number of messages but the burst rate. We have the issue is we have an high rate for a moment. The question is: what is causing the RPMessage_allocEndPtMsg  to return a null pointer?

  • Hello,

    Can you share around at what burst rate this issue is caused so that I can reproduce and debug.

    Thanks,
    Gunjan

  • I was able to reproduce the issue with a modified version of the rpmsg example. In the example I'm using just 2 cores with a modified syscfg as showed in the following picture:

    The issue happens if I pause all the cores and than resume and it seems a synchronization issue. In our project the issue happens even without debugging but in the example I'm still not able to reproduce the synchronization.

    ipc_rpmsg_echo.c
    /*
     *  Copyright (C) 2021 Texas Instruments Incorporated
     *
     *  Redistribution and use in source and binary forms, with or without
     *  modification, are permitted provided that the following conditions
     *  are met:
     *
     *    Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     *    Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the
     *    distribution.
     *
     *    Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    #include <stdio.h>
    #include <string.h>
    #include <inttypes.h>
    #include <kernel/dpl/ClockP.h>
    #include <kernel/dpl/DebugP.h>
    #include <drivers/ipc_notify.h>
    #include <drivers/ipc_rpmsg.h>
    #include "ti_drivers_open_close.h"
    #include "ti_board_open_close.h"
    
    /* This example shows message exchange between multiple cores.
     *
     * One of the core is designated as the 'main' core
     * and other cores are designated as `remote` cores.
     *
     * The main core initiates IPC with remote core's by sending it a message.
     * The remote cores echo the same message to the main core.
     *
     * The main core repeats this for gMsgEchoCount iterations.
     *
     * In each iteration of message exchange, the message value is incremented.
     *
     * When iteration count reaches gMsgEchoCount, the example is completed.
     *
     */
    
    /* number of iterations of message exchange to do */
    uint32_t gMsgEchoCount = 10000u;
    
    #if defined(SOC_AM243X)
    /* main core that starts the message exchange */
    uint32_t gMainCoreId = CSL_CORE_ID_R5FSS0_0;
    /* remote cores that echo messages from main core, make sure to NOT list main core in this list */
    uint32_t gRemoteCoreId[] = {
        CSL_CORE_ID_M4FSS0_0,
        CSL_CORE_ID_R5FSS0_1,
        CSL_CORE_ID_R5FSS1_0,
        CSL_CORE_ID_R5FSS1_1,
        CSL_CORE_ID_MAX /* this value indicates the end of the array */
    };
    #endif
    
    #if defined (SOC_AM263X)
    /* main core that starts the message exchange */
    uint32_t gMainCoreId = CSL_CORE_ID_R5FSS0_0;
    /* remote cores that echo messages from main core, make sure to NOT list main core in this list */
    uint32_t gRemoteCoreId[] = {
    //    CSL_CORE_ID_R5FSS0_1,
        CSL_CORE_ID_R5FSS1_0,
    //    CSL_CORE_ID_R5FSS1_1,
        CSL_CORE_ID_MAX /* this value indicates the end of the array */
    };
    #endif
    
    #if defined(SOC_AM64X)
    /* main core that starts the message exchange */
    uint32_t gMainCoreId = CSL_CORE_ID_R5FSS0_0;
    /* remote cores that echo messages from main core, make sure to NOT list main core in this list */
    uint32_t gRemoteCoreId[] = {
        CSL_CORE_ID_M4FSS0_0,
        CSL_CORE_ID_R5FSS0_1,
        CSL_CORE_ID_R5FSS1_0,
        CSL_CORE_ID_R5FSS1_1,
        CSL_CORE_ID_A53SS0_0,
        CSL_CORE_ID_MAX /* this value indicates the end of the array */
    };
    #endif
    
    #if defined(SOC_AM273X) || defined(SOC_AWR294X)
    /* main core that starts the message exchange */
    uint32_t gMainCoreId = CSL_CORE_ID_R5FSS0_0;
    /* remote cores that echo messages from main core, make sure to NOT list main core in this list */
    uint32_t gRemoteCoreId[] = {
        CSL_CORE_ID_R5FSS0_1,
        CSL_CORE_ID_C66SS0,
        CSL_CORE_ID_MAX /* this value indicates the end of the array */
    };
    #endif
    
    /*
     * Remote core service end point
     *
     * pick any unique value on that core between 0..RPMESSAGE_MAX_LOCAL_ENDPT-1
     * the value need not be unique across cores
     */
    uint16_t gRemoteServiceEndPt = 13u;
    
    /* maximum size that message can have in this example */
    #define MAX_MSG_SIZE        (64u)
    
    /* Main core ack reply end point
     *
     * pick any unique value on that core between 0..RPMESSAGE_MAX_LOCAL_ENDPT-1
     * the value need not be unique across cores
     */
    #define MAIN_CORE_ACK_REPLY_END_PT  (12U)
    
    #define BURST_NUM 16
    #define REPLAY_NUM 32
    
    /* RPMessage_Object MUST be global or static */
    RPMessage_Object gAckReplyMsgObject;
    
    void ipc_rpmsg_echo_main_core_start(void)
    {
        uint32_t myMsgTxCount = 0u;
        uint32_t myMsgRxCount = 0u;
        RPMessage_CreateParams createParams;
        uint32_t msg, i, numRemoteCores;
        uint64_t curTime;
        char msgBuf[MAX_MSG_SIZE];
        uint32_t currentlySent[4];
        int32_t status;
        uint16_t remoteCoreId, remoteCoreEndPt, msgSize;
    
        RPMessage_CreateParams_init(&createParams);
        createParams.localEndPt = MAIN_CORE_ACK_REPLY_END_PT;
        status = RPMessage_construct(&gAckReplyMsgObject, &createParams);
        DebugP_assert(status==SystemP_SUCCESS);
    
        numRemoteCores = 0;
        for(i=0; gRemoteCoreId[i]!=CSL_CORE_ID_MAX; i++)
        {
            numRemoteCores++;
        }
    
        /* wait for all cores to be ready */
        IpcNotify_syncAll(SystemP_WAIT_FOREVER);
    
        ClockP_usleep(500*1000); /* wait for log messages from remote cores to be flushed, otherwise this delay is not needed */
    
        DebugP_log("[IPC RPMSG ECHO] Message exchange started by main core !!!\r\n");
    
        curTime = ClockP_getTimeUsec();
    
        for(msg=0; msg<gMsgEchoCount; msg++)
        {
            snprintf(msgBuf, MAX_MSG_SIZE-1, "%d", msg);
            msgBuf[MAX_MSG_SIZE-1] = 0;
            msgSize = strlen(msgBuf) + 1; /* count the terminating char as well */
    
            /* send the same messages to all cores */
            for(i=0; gRemoteCoreId[i]!=CSL_CORE_ID_MAX; i++ )
            {
                currentlySent[i] = 0;
                for (int var = 0; var < BURST_NUM; ++var) {
                    status = RPMessage_send(
                        msgBuf, msgSize,
                        gRemoteCoreId[i], gRemoteServiceEndPt,
                        RPMessage_getLocalEndPt(&gAckReplyMsgObject),
                        10);
                    if(status==SystemP_SUCCESS)
                    {
                        currentlySent[i]++;
                    }
                    //DebugP_assert(status==SystemP_SUCCESS);
                    myMsgTxCount++;
                }
            }
            /* wait for response from all cores */
            for(i=0; gRemoteCoreId[i]!=CSL_CORE_ID_MAX; i++ )
            {
                /* set 'msgSize' to size of recv buffer,
                * after return `msgSize` contains actual size of valid data in recv buffer
                */
                msgSize = sizeof(msgBuf);
                for (int var = 0; var < currentlySent[i] * REPLAY_NUM; ++var) {
                    status = RPMessage_recv(&gAckReplyMsgObject,
                        msgBuf, &msgSize,
                        &remoteCoreId, &remoteCoreEndPt,
                        10);
                    if(status==SystemP_SUCCESS)
                        myMsgRxCount++;
                }
            }
        }
    
        curTime = ClockP_getTimeUsec() - curTime;
    
        DebugP_log("[IPC RPMSG ECHO] All echoed messages received by main core from %d remote cores !!!\r\n", numRemoteCores);
        DebugP_log("[IPC RPMSG ECHO] Messages sent to each core = %d \r\n", gMsgEchoCount * BURST_NUM);
    
        DebugP_log("[IPC RPMSG ECHO] Total messages tried to send = %d \r\n", myMsgTxCount);
        DebugP_log("[IPC RPMSG ECHO] Total messages received = %d \r\n", myMsgRxCount);
    
        DebugP_log("[IPC RPMSG ECHO] Number of remote cores = %d \r\n", numRemoteCores);
        DebugP_log("[IPC RPMSG ECHO] Total execution time = %" PRId64 " usecs\r\n", curTime);
        DebugP_log("[IPC RPMSG ECHO] One way message latency = %" PRId32 " nsec\r\n",
            (uint32_t)(curTime*1000u/(gMsgEchoCount*numRemoteCores*2)));
    
        RPMessage_destruct(&gAckReplyMsgObject);
    
        DebugP_log("All tests have passed!!\r\n");
    }
    
    /* RPMessage_Object MUST be global or static */
    static RPMessage_Object gRecvMsgObject;
    
    void ipc_rpmsg_echo_remote_core_start(void)
    {
        int32_t status;
        uint64_t curTime;
        RPMessage_CreateParams createParams;
        char recvMsg[MAX_MSG_SIZE];
        uint16_t recvMsgSize, remoteCoreId, remoteCoreEndPt;
    
        RPMessage_CreateParams_init(&createParams);
        createParams.localEndPt = gRemoteServiceEndPt;
        status = RPMessage_construct(&gRecvMsgObject, &createParams);
        DebugP_assert(status==SystemP_SUCCESS);
    
        /* wait for all cores to be ready */
        IpcNotify_syncAll(SystemP_WAIT_FOREVER);
    
        DebugP_log("[IPC RPMSG ECHO] Remote Core waiting for messages from main core ... !!!\r\n");
    
        /* wait for messages forever in a loop */
        while(1)
        {
            /* set 'recvMsgSize' to size of recv buffer,
            * after return `recvMsgSize` contains actual size of valid data in recv buffer
            */
            recvMsgSize = sizeof(recvMsg);
            status = RPMessage_recv(&gRecvMsgObject,
                recvMsg, &recvMsgSize,
                &remoteCoreId, &remoteCoreEndPt,
                SystemP_WAIT_FOREVER);
            if(status==SystemP_SUCCESS)
            {
                /* echo the same message string as reply */
                for (int var = 0; var < REPLAY_NUM; ++var) {
                    /* send ack to sender CPU at the sender end point */
                    status = RPMessage_send(
                        recvMsg, recvMsgSize,
                        remoteCoreId, remoteCoreEndPt,
                        RPMessage_getLocalEndPt(&gRecvMsgObject),
                        SystemP_WAIT_FOREVER);
                    DebugP_assert(status==SystemP_SUCCESS);
                }
    
                curTime = ClockP_getTimeUsec();
                while(ClockP_getTimeUsec() - curTime > 10000); // Let hang for 10ms
            }
        }
        /* This loop will never exit */
    }
    
    void ipc_rpmsg_echo_main(void *args)
    {
        Drivers_open();
        Board_driversOpen();
    
        if(IpcNotify_getSelfCoreId()==gMainCoreId)
        {
            ipc_rpmsg_echo_main_core_start();
        }
        else
        {
            ipc_rpmsg_echo_remote_core_start();
        }
    
        Board_driversClose();
        /* We dont close drivers to let the UART driver remain open and flush any pending messages to console */
        /* Drivers_close(); */
    }
    

    Last but not least, we are using a patched SDK where the files ipc_rpmsg.c ipc_rpmsg_priv.h and ipc_rpmsg_vring.c are modified according to TI suggestion so to avoid another bug.  

    3652.ipc_rpmsg.c
    /*
     *  Copyright (C) 2018-2021 Texas Instruments Incorporated
     *
     *  Redistribution and use in source and binary forms, with or without
     *  modification, are permitted provided that the following conditions
     *  are met:
     *
     *    Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     *    Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the
     *    distribution.
     *
     *    Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    #include <drivers/ipc_rpmsg/ipc_rpmsg_priv.h>
    
    IpcRpmsg_Ctrl gIpcRpmsgCtrl;
    
    RPMessage_LocalMsg *RPMessage_allocEndPtMsg(uint32_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_LocalMsg *pMsg;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
        pMsg = (RPMessage_LocalMsg*)RPMessage_queueGet(&coreObj->freeQ);
        if(pMsg == NULL)
        {
            coreObj->freeQAllocPending = 1;
        }
        else
        {
            coreObj->freeQAllocPending = 0;
        }
        HwiP_restore(oldIntState);
    
        return pMsg;
    }
    
    uint32_t RPMessage_freeEndPtMsg(uint16_t remoteCoreId, RPMessage_LocalMsg *pMsg)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        uint32_t oldIntState, isAllocPending;
    
        oldIntState = HwiP_disable();
        isAllocPending = coreObj->freeQAllocPending;
        RPMessage_queuePut(&coreObj->freeQ, &pMsg->elem);
        HwiP_restore(oldIntState);
    
        return isAllocPending;
    }
    
    void RPMessage_putEndPtMsg(RPMessage_Struct *obj, RPMessage_LocalMsg *pMsg)
    {
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
        RPMessage_queuePut(&obj->endPtQ, &pMsg->elem);
        HwiP_restore(oldIntState);
    
        SemaphoreP_post(&obj->newEndPtMsgSem);
    }
    
    int32_t RPMessage_getEndPtMsg(RPMessage_Struct *obj, RPMessage_LocalMsg **pMsg, uint32_t timeout)
    {
        uint32_t oldIntState, done;
        int32_t status = SystemP_TIMEOUT;
    
        done = 0;
        do {
            oldIntState = HwiP_disable();
            *pMsg = (RPMessage_LocalMsg*)RPMessage_queueGet(&obj->endPtQ);
            HwiP_restore(oldIntState);
    
            if(*pMsg==NULL)
            {
                status = SemaphoreP_pend(&obj->newEndPtMsgSem, timeout);
                if(status == SystemP_TIMEOUT)
                {
                    done = 1;
                }
                if(status == SystemP_SUCCESS && obj->doRecvUnblock)
                {
                    status = SystemP_TIMEOUT;
                    done = 1;
                }
            }
            else
            {
                status = SystemP_SUCCESS;
                done = 1;
            }
        } while( ! done );
    
        return status;
    }
    
    /* handle one new received message from vring */
    void RPMessage_recvHandler(uint32_t remoteCoreId)
    {
        uint16_t vringBufId;
        int32_t status;
        RPMessage_LocalMsg *pMsg;
    
        /* get a free message pointer to hold vring buf info
         * if no free message pointer then dont remove from vring
         */
        pMsg = RPMessage_allocEndPtMsg(remoteCoreId);
        if(pMsg!=NULL)
        {
            status = RPMessage_vringGetFullRxBuf(remoteCoreId, &vringBufId);
            if(status == SystemP_SUCCESS)
            {
                /* message in vring, extract it and copy info to message pointer and put in end point Q */
                uint8_t *vringBufAddr = RPMessage_vringGetRxBufAddr(remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
                uint16_t localEndPt = header->dstEndPt;
    
                status = SystemP_FAILURE;
                if(localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT)
                {
                    RPMessage_Struct *obj = gIpcRpmsgCtrl.localEndPtObj[localEndPt];
                    if(obj!=NULL)
                    {
                        if(obj->recvCallback != NULL)
                        {
                            /* recv messages handled in callback mode */
                            obj->recvCallback( (RPMessage_Object*)obj,
                                obj->recvCallbackArgs,
                                &vringBufAddr[sizeof(RPMessage_Header)],
                                header->dataLen,
                                remoteCoreId,
                                header->srcEndPt
                                );
                            status = SystemP_SUCCESS;
    
                            /* pMsg is not used, free it */
                            RPMessage_freeEndPtMsg(remoteCoreId, pMsg);
                            /* done using vring buf, free it */
                            RPMessage_vringPutEmptyRxBuf(remoteCoreId, vringBufId);
                        }
                        else
                        {
                            /* recv messages handled in non-callback mode */
                            pMsg->remoteCoreId = remoteCoreId;
                            pMsg->vringBufId = vringBufId;
                            RPMessage_putEndPtMsg(obj, pMsg);
                            status = SystemP_SUCCESS;
    
                            if(obj->recvNotifyCallback!=NULL)
                            {
                                obj->recvNotifyCallback((RPMessage_Object*)obj, obj->recvNotifyCallbackArgs);
                            }
                        }
                    }
                }
                if(status!=SystemP_SUCCESS)
                {
                    /* invalid vring message header or invalid endpt
                    * or no object registered for local end pt, so no need handle the message pointer,
                    * free it
                    */
                    RPMessage_vringPutEmptyRxBuf(remoteCoreId, vringBufId);
                }
            }
            if(status!=SystemP_SUCCESS)
            {
                /* no message in vring or invalid vring message header or invalid endpt
                * or no object registered for local end pt, so no need handle the message pointer,
                * free it
                */
                RPMessage_freeEndPtMsg(remoteCoreId, pMsg);
            }
        }
    }
    
    void RPMessage_notifyCallback(uint32_t remoteCoreId, uint16_t localClientId, uint32_t msgValue, void *args)
    {
        if(gIpcRpmsgCtrl.isCoreEnable[remoteCoreId] && gIpcRpmsgCtrl.isCoreInitialized[remoteCoreId])
        {
            uint16_t rxMsgValue = RPMESSAGE_MSG_VRING_NEW_FULL;
    
            if(RPMessage_isLinuxCore(remoteCoreId))
            {
                rxMsgValue = RPMESSAGE_LINUX_RX_VRING_ID; /* In linux, we get RX VRING ID, which is 1 in linux */
            }
            if(msgValue == rxMsgValue)
            {   /* check full ring */
                while(RPMessage_vringIsFullRxBuf(remoteCoreId))
                {
                    RPMessage_recvHandler(remoteCoreId);
                }
            }
            else
            {   /* check empty ring */
    
                /* check if there is any new empty buf, if yes then post semaphore to wake up any waiting threads */
                RPMessage_vringCheckEmptyTxBuf(remoteCoreId);
            }
        }
    }
    
    int32_t RPMessage_send( void*    data,
                            uint16_t dataLen,
                            uint16_t remoteCoreId,
                            uint16_t remoteEndPt,
                            uint16_t localEndPt,
                            uint32_t timeout
                          )
    {
        int32_t status = SystemP_FAILURE;
    
        if(remoteCoreId < CSL_CORE_ID_MAX && gIpcRpmsgCtrl.isCoreEnable[remoteCoreId]
            && data != NULL && dataLen != 0
            )
        {
            uint16_t vringBufId;
    
            status = RPMessage_vringGetEmptyTxBuf(remoteCoreId, &vringBufId, timeout);
            if(status == SystemP_SUCCESS)
            {
                uint8_t *vringBufAddr = RPMessage_vringGetTxBufAddr(remoteCoreId, vringBufId);
                uint16_t vringBufLen = RPMessage_vringGetTxBufLen(remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
    
                if(dataLen > (vringBufLen - sizeof(RPMessage_Header)) )
                {
                    dataLen = vringBufLen - sizeof(RPMessage_Header);
    
                    DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point truncated due to lack of space in vring buffer !!!\r\n",
                        remoteCoreId, remoteEndPt);
                }
    
                header->srcEndPt = localEndPt;
                header->dstEndPt = remoteEndPt;
                header->srcCoreId = gIpcRpmsgCtrl.selfCoreId;
                header->flags = 0;
                header->dataLen = dataLen;
    
                memcpy( &vringBufAddr[sizeof(RPMessage_Header)], data, dataLen);
    
                status = RPMessage_vringPutFullTxBuf(remoteCoreId, vringBufId, dataLen + sizeof(RPMessage_Header), timeout);
    
                if(status != SystemP_SUCCESS)
                {
                    DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to lack of space in Notify Queue !!!\r\n",
                    remoteCoreId, remoteEndPt);
                }
            }
            else
            {
                DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to lack of space in vring !!!\r\n",
                    remoteCoreId, remoteEndPt);
            }
        }
        else
        {
            DebugP_logError("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to invalid parameters !!!\r\n",
                remoteCoreId, remoteEndPt
                );
        }
        return status;
    }
    
    int32_t RPMessage_recv(RPMessage_Object *handle, void* data, uint16_t *dataLen,
                          uint16_t *remoteCoreId, uint16_t *remoteEndPt, uint32_t timeout)
    {
        int32_t status = SystemP_FAILURE;
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        if( data != NULL && dataLen != NULL && remoteCoreId != NULL && remoteEndPt != NULL
            && obj->recvCallback == NULL /* i.e non-callback mode */
          )
        {
            RPMessage_LocalMsg *pMsg;
    
            status = RPMessage_getEndPtMsg(obj, &pMsg, timeout);
            if(status == SystemP_SUCCESS && pMsg != NULL)
            {
                uint32_t isAllocPending = 0;
                uint16_t vringBufId = pMsg->vringBufId;
                uint8_t *vringBufAddr = RPMessage_vringGetRxBufAddr(pMsg->remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
    
                *remoteCoreId = pMsg->remoteCoreId;
                *remoteEndPt =  header->srcEndPt;
    
                if( *dataLen < header->dataLen )
                {
                    DebugP_logWarn("[IPC RPMSG] Message recv @ %d local end point truncated due to insufficient user buffer size !!!\r\n",
                        obj->localEndPt
                        );
                }
                else
                {
                    *dataLen = header->dataLen;
                }
    
                memcpy( data, &vringBufAddr[sizeof(RPMessage_Header)], *dataLen);
    
                RPMessage_vringPutEmptyRxBuf(*remoteCoreId, vringBufId);
                isAllocPending = RPMessage_freeEndPtMsg(*remoteCoreId, pMsg);
                if(isAllocPending)
                {   /* if any messages are pending message pointer due to free Q being empty,
                     * now there will be atleast one element to handle any pending vring requests.
                     * So check vring and handle pending messages if any
                     */
                    RPMessage_notifyCallback(*remoteCoreId,
                        IPC_NOTIFY_CLIENT_ID_RPMSG,
                        RPMESSAGE_MSG_VRING_NEW_FULL,
                        NULL);
                }
            }
            else
            {
                if(status != SystemP_TIMEOUT)
                {
                    DebugP_logError("[IPC RPMSG] Message recv @ %d local end point failed due to invalid end point Q !!!\r\n",
                        obj->localEndPt
                        );
                }
            }
        }
        else
        {
            DebugP_logError("[IPC RPMSG] Message recv @ %d local end point failed due to invalid parameters !!!\r\n",
                obj->localEndPt
                );
        }
        return status;
    }
    
    void RPMessage_unblock(RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        obj->doRecvUnblock = 1;
        SemaphoreP_post(&obj->newEndPtMsgSem);
    }
    
    uint16_t RPMessage_getLocalEndPt(const RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        return obj->localEndPt;
    }
    
    int32_t RPMessage_construct(RPMessage_Object *handle, const RPMessage_CreateParams *createParams)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
        int32_t status = SystemP_FAILURE;
    
        DebugP_assert(sizeof(RPMessage_Object) >= sizeof(RPMessage_Struct));
    
        if(createParams->localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT
            && gIpcRpmsgCtrl.localEndPtObj[createParams->localEndPt] == NULL)
        {
            obj->localEndPt = createParams->localEndPt;
            obj->recvCallback = createParams->recvCallback;
            obj->recvCallbackArgs = createParams->recvCallbackArgs;
            obj->recvNotifyCallback = createParams->recvNotifyCallback;
            obj->recvNotifyCallbackArgs = createParams->recvNotifyCallbackArgs;
            obj->doRecvUnblock = 0;
            RPMessage_queueReset(&obj->endPtQ);
            SemaphoreP_constructBinary(&obj->newEndPtMsgSem, 0);
    
            gIpcRpmsgCtrl.localEndPtObj[createParams->localEndPt] = obj;
    
            status = SystemP_SUCCESS;
        }
        return status;
    }
    
    void RPMessage_destruct(RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        if(obj->localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT &&
            gIpcRpmsgCtrl.localEndPtObj[obj->localEndPt] != NULL)
        {
            gIpcRpmsgCtrl.localEndPtObj[obj->localEndPt] = NULL;
    
            obj->localEndPt = RPMESSAGE_MAX_LOCAL_ENDPT;
            obj->recvCallback = NULL;
            obj->recvCallbackArgs = NULL;
            obj->doRecvUnblock = 0;
            RPMessage_queueReset(&obj->endPtQ);
            SemaphoreP_destruct(&obj->newEndPtMsgSem);
        }
    }
    
    void RPMessage_CreateParams_init(RPMessage_CreateParams *params)
    {
        params->localEndPt = RPMESSAGE_MAX_LOCAL_ENDPT;
        params->recvCallback = NULL;
        params->recvCallbackArgs = NULL;
        params->recvNotifyCallback = NULL;
        params->recvNotifyCallbackArgs = NULL;
    }
    
    void RPMessage_Params_init(RPMessage_Params *params)
    {
        uint16_t coreId;
    
        memset(params, 0, sizeof(RPMessage_Params));
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            params->vringTxBaseAddr[coreId] = RPMESSAGE_VRING_ADDR_INVALID;
            params->vringRxBaseAddr[coreId] = RPMESSAGE_VRING_ADDR_INVALID;
        }
        params->vringNumBuf = 8;
        params->vringMsgSize = 128;
        params->vringSize = RPMESSAGE_VRING_SIZE(params->vringNumBuf, params->vringMsgSize);
        params->linuxCoreId = CSL_CORE_ID_MAX;
        params->linuxResourceTable = NULL;
    }
    
    int32_t  RPMessage_coreInit(uint16_t remoteCoreId, const RPMessage_Params *params)
    {
        int32_t status = SystemP_SUCCESS;
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        uint16_t elemId;
    
        SemaphoreP_constructBinary(&coreObj->newEmptyVringBufSem, 0);
        coreObj->freeQAllocPending = 0;
        RPMessage_queueReset(&coreObj->freeQ);
        for(elemId=0; elemId<RPMESSAGE_MAX_LOCAL_MSG_OBJ; elemId++)
        {
            RPMessage_queuePut(&coreObj->freeQ, &coreObj->localMsgObj[elemId].elem);
        }
        /* Linux VRINGs we will init later inside RPMessage_waitForLinuxReady() */
        if(gIpcRpmsgCtrl.isCoreEnable[remoteCoreId] && !RPMessage_isLinuxCore(remoteCoreId))
        {
            /* reset RX ring */
            RPMessage_vringReset(remoteCoreId, 0, params);
            /* reset TX ring */
            RPMessage_vringReset(remoteCoreId, 1, params);
    
            /* mark core data structure as initialized, now we can handle interrupts */
            gIpcRpmsgCtrl.isCoreInitialized[remoteCoreId] = 1;
        }
        return status;
    }
    
    void RPMessage_coreDeInit(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
    
        SemaphoreP_destruct(&coreObj->newEmptyVringBufSem);
        coreObj->freeQAllocPending = 0;
        RPMessage_queueReset(&coreObj->freeQ);
    }
    
    void RPMessage_forceRecvMsgHandlers(void)
    {
        uint16_t coreId;
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            RPMessage_notifyCallback(coreId,
                IPC_NOTIFY_CLIENT_ID_RPMSG,
                RPMESSAGE_MSG_VRING_NEW_FULL,
                NULL);
        }
    }
    
    void RPMessage_controlEndPtHandler(RPMessage_Object *obj, void *arg,
            void *data, uint16_t dataLen,
            uint16_t remoteCoreId, uint16_t remoteEndPt)
    {
        if(gIpcRpmsgCtrl.controlEndPtCallback)
        {
            /* check if message is of correct size */
            if(dataLen == sizeof(RPMessage_AnnounceMsg))
            {
                /* invoke user callback */
                RPMessage_AnnounceMsg *pMsg = (RPMessage_AnnounceMsg*)data;
    
                gIpcRpmsgCtrl.controlEndPtCallback(
                    gIpcRpmsgCtrl.controlEndPtCallbackArgs,
                    remoteCoreId,
                    pMsg->remoteEndPt,
                    pMsg->name
                    );
            }
        }
    }
    
    int32_t RPMessage_controlEndPtInit(void)
    {
        RPMessage_CreateParams createPrms;
        int32_t status;
    
        RPMessage_CreateParams_init(&createPrms);
        createPrms.localEndPt = RPMESSAGE_CTRL_ENDPOINT_ID;
        createPrms.recvCallback = RPMessage_controlEndPtHandler;
        status = RPMessage_construct(&gIpcRpmsgCtrl.controlEndPtObj, &createPrms);
    
        return status;
    }
    
    void RPMessage_controlEndPtDeInit(void)
    {
        RPMessage_destruct(&gIpcRpmsgCtrl.controlEndPtObj);
    }
    
    int32_t  RPMessage_announce(uint16_t remoteCoreId, uint16_t localEndPt, const char* name)
    {
        int32_t status;
        RPMessage_AnnounceMsg msg;
    
        msg.type = 0;
        msg.remoteEndPt = localEndPt; /* local end point will be remote end point for the other side */
        strncpy(msg.name, name, RPMESSAGE_ANNOUNCE_SERVICENAME_LEN-1);
        msg.name[RPMESSAGE_ANNOUNCE_SERVICENAME_LEN-1] = '\0';
    
        status = RPMessage_send(
                    &msg,
                    sizeof(RPMessage_AnnounceMsg),
                    remoteCoreId,
                    RPMESSAGE_CTRL_ENDPOINT_ID, /* control end point on remote side */
                    RPMESSAGE_CTRL_ENDPOINT_ID, /* reply or local end point, set also to control end point */
                    SystemP_WAIT_FOREVER /* wait until message is put in VRING */
        );
        return status;
    }
    
    void RPMessage_controlEndPtCallback(RPMessage_ControlEndPtCallback controlEndPtCallback,
        void  *controlEndPtCallbackArgs)
    {
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        gIpcRpmsgCtrl.controlEndPtCallback = controlEndPtCallback;
        gIpcRpmsgCtrl.controlEndPtCallbackArgs = controlEndPtCallbackArgs;
    
        HwiP_restore(oldIntState);
    }
    
    uint32_t RPMessage_isLinuxCore(uint16_t coreId)
    {
        uint32_t isLinuxCore = 0;
    
        if(coreId == gIpcRpmsgCtrl.linuxCoreId && gIpcRpmsgCtrl.linuxResourceTable)
        {
            isLinuxCore = 1;
        }
        return isLinuxCore;
    }
    
    int32_t  RPMessage_waitForLinuxReady(uint32_t timeout)
    {
        int32_t status = SystemP_FAILURE;
        volatile RPMessage_ResourceTable *rscTable = (RPMessage_ResourceTable *)gIpcRpmsgCtrl.linuxResourceTable;
    
        if(rscTable)
        {
            uint32_t elaspedTicks, startTicks = ClockP_getTicks();
            do
            {
                CacheP_inv((void*)rscTable, sizeof(RPMessage_ResourceTable), CacheP_TYPE_ALL);
                if(rscTable->vdev.status == 0x7U)
                {
                    /* linux has initialized the resource table, break out */
                    status = SystemP_SUCCESS;
                }
                if(status != SystemP_SUCCESS)
                {
                    elaspedTicks = ClockP_getTicks() - startTicks;
                    if( elaspedTicks >= timeout)
                    {
                        /* timeout, linux did not init the resource table in user specific timeout time */
                        status = SystemP_TIMEOUT;
                    }
                    if(status != SystemP_TIMEOUT)
                    {
                        /* sleep one tick */
                        ClockP_usleep(ClockP_ticksToUsec(1));
                    }
                }
            } while(status == SystemP_FAILURE);
    
            if(status == SystemP_SUCCESS)
            {
                /* init virtio on linux side */
    
                /* initialize RX VRING */
                RPMessage_vringResetLinux(
                        gIpcRpmsgCtrl.linuxCoreId,
                        0,
                        gIpcRpmsgCtrl.linuxResourceTable);
                /* initialize TX VRING */
                RPMessage_vringResetLinux(
                        gIpcRpmsgCtrl.linuxCoreId,
                        1,
                        gIpcRpmsgCtrl.linuxResourceTable);
    
                /* mark core data structure as initialized, now we can handle interrupts */
                gIpcRpmsgCtrl.isCoreInitialized[gIpcRpmsgCtrl.linuxCoreId] = 1;
            }
        }
    
        return status;
    }
    
    int32_t  RPMessage_init(const RPMessage_Params *params)
    {
        int32_t status = SystemP_SUCCESS;
        uint16_t coreId, localEndPtId;
    
        gIpcRpmsgCtrl.selfCoreId = IpcNotify_getSelfCoreId();
        gIpcRpmsgCtrl.controlEndPtCallback = NULL;
        gIpcRpmsgCtrl.controlEndPtCallbackArgs = NULL;
        gIpcRpmsgCtrl.linuxResourceTable = params->linuxResourceTable;
        gIpcRpmsgCtrl.linuxCoreId = params->linuxCoreId;
        for(localEndPtId = 0; localEndPtId < RPMESSAGE_MAX_LOCAL_ENDPT; localEndPtId++)
        {
            gIpcRpmsgCtrl.localEndPtObj[localEndPtId] = NULL;
        }
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            /* enable a core for RPMessage only when below is satisifed
             * - valid vring ID is set
             * - not self core ID
             * - IPC Notify with that core is enabled
             */
            gIpcRpmsgCtrl.isCoreEnable[coreId] = 0;
            gIpcRpmsgCtrl.isCoreInitialized[coreId] = 0;
            if(params->vringTxBaseAddr[coreId] != RPMESSAGE_VRING_ADDR_INVALID
                &&
                params->vringRxBaseAddr[coreId] != RPMESSAGE_VRING_ADDR_INVALID
                &&
                coreId != gIpcRpmsgCtrl.selfCoreId
                &&
                IpcNotify_isCoreEnabled(coreId)
              )
            {
                gIpcRpmsgCtrl.isCoreEnable[coreId] = 1;
            }
            if(RPMessage_isLinuxCore(coreId)
                && IpcNotify_isCoreEnabled(coreId)
                )
            {
                gIpcRpmsgCtrl.isCoreEnable[coreId] = 1;
            }
    
        }
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            status |= RPMessage_coreInit(coreId, params);
        }
    
        /* create control end point */
        status |= RPMessage_controlEndPtInit();
    
        IpcNotify_registerClient(IPC_NOTIFY_CLIENT_ID_RPMSG,
            RPMessage_notifyCallback, NULL
            );
    
        return status;
    }
    
    void  RPMessage_deInit(void)
    {
        uint16_t coreId;
    
        IpcNotify_unregisterClient(IPC_NOTIFY_CLIENT_ID_RPMSG);
    
        RPMessage_controlEndPtDeInit();
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            RPMessage_coreDeInit(coreId);
        }
    }
    
    
    3652.ipc_rpmsg_priv.h
    3652.ipc_rpmsg_vring.c
    /*
     *  Copyright (C) 2018-2021 Texas Instruments Incorporated
     *
     *  Redistribution and use in source and binary forms, with or without
     *  modification, are permitted provided that the following conditions
     *  are met:
     *
     *    Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     *    Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the
     *    distribution.
     *
     *    Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    #include <drivers/ipc_rpmsg/ipc_rpmsg_priv.h>
    
    /* NOTE:
     * For RTOS to RTOS IPC RPMessage
     *   - AVAIL in vring = EMPTY in ipc rpmsg implementation
     *   - USED in vring = FULL  in ipc rpmsg implementation
     *
     * For Linux to RTOS IPC RPMessage
     * - When doing TX from RTOS
     *   - AVAIL in vring = EMPTY in ipc rpmsg implementation
     *   - USED in vring = FULL  in ipc rpmsg implementation
     * - When doing RX from RTOS
     *   - AVAIL in vring = FULL in ipc rpmsg implementation
     *   - USED in vring = EMTPY  in ipc rpmsg implementation
     *
     * RPMessage_isLinuxCore is used in Rx APIs to switch the meaning when
     * receiving messages from core ID which runs Linux
     */
    
    #define VRING_USED_F_NO_NOTIFY  (1U)
    
    int32_t RPMessage_vringGetEmptyTxBuf(uint16_t remoteCoreId, uint16_t *vringBufId, uint32_t timeout)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        uint32_t oldIntState;
        uint16_t head;
        int32_t status = SystemP_FAILURE;
        uint32_t done = 0;
    
        oldIntState = HwiP_disable();
    
        do
        {
            /* There's nothing available */
            if (vringObj->lastAvailIdx == vringObj->avail->idx)
            {
                /* We need to know about added buffers */
                vringObj->used->flags &= (uint16_t)~VRING_USED_F_NO_NOTIFY;
    
                HwiP_restore(oldIntState);
    
                status = SemaphoreP_pend(&coreObj->newEmptyVringBufSem, timeout);
                if(status==SystemP_TIMEOUT)
                {
                    done = 1;
                }
    
                oldIntState = HwiP_disable();
            }
            else
            {
                head = vringObj->avail->ring[vringObj->lastAvailIdx % vringObj->vringNumBuf];
                vringObj->lastAvailIdx++;
    
                *vringBufId = head;
                done = 1;
                status = SystemP_SUCCESS;
            }
        } while( ! done );
    
        HwiP_restore(oldIntState);
    
        return status;
    }
    
    int32_t  RPMessage_vringPutFullTxBuf(uint16_t remoteCoreId, uint16_t vringBufId, uint16_t dataLen, uint32_t timeout)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        struct vring_used_elem *used;
        uint32_t oldIntState;
        uint32_t txMsgValue = RPMESSAGE_MSG_VRING_NEW_FULL;
        int32_t status = SystemP_FAILURE;
        uint32_t elapsedTicks, startTicks;
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            /* for linux we need to send the TX VRING ID in the mailbox message */
            txMsgValue = RPMESSAGE_LINUX_TX_VRING_ID;
        }
    
        oldIntState = HwiP_disable();
    
        used = &vringObj->used->ring[vringObj->used->idx % vringObj->vringNumBuf];
        used->id = vringBufId;
        used->len = dataLen;
        vringObj->used->idx++;
    
        #if defined(__aarch64__) || defined(__arm__)
        __asm__ __volatile__ ( "dsb sy"  "\n\t": : : "memory");
        __asm__ __volatile__ ( "isb sy"  "\n\t": : : "memory");
        #endif
        #if defined(_TMS320C6X)
        _mfence();
        _mfence();
        #endif
    
        HwiP_restore(oldIntState);
    
        startTicks = ClockP_getTicks();
        do
        {
            status = IpcNotify_sendMsg(remoteCoreId, IPC_NOTIFY_CLIENT_ID_RPMSG, txMsgValue, 0);
            elapsedTicks = ClockP_getTicks() - startTicks;
        } while((elapsedTicks < timeout) && (status == SystemP_TIMEOUT));
    
        if(elapsedTicks >= timeout)
        {
            status = SystemP_TIMEOUT;
        }
    
        return status;
    }
    
    void RPMessage_vringCheckEmptyTxBuf(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        uint32_t isNewEmptyBuf = 1;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        if (vringObj->lastAvailIdx == vringObj->avail->idx)
        {
            isNewEmptyBuf = 0;
        }
    
        HwiP_restore(oldIntState);
    
        if(isNewEmptyBuf)
        {
            SemaphoreP_post(&coreObj->newEmptyVringBufSem);
        }
    }
    
    uint8_t *RPMessage_vringGetTxBufAddr(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
    
        return (uint8_t *)vringObj->desc[vringBufId].addr;
    }
    
    uint32_t RPMessage_vringGetTxBufLen(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
    
        return vringObj->desc[vringBufId].len;
    }
    
    int32_t RPMessage_vringGetFullRxBuf(uint16_t remoteCoreId, uint16_t *vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint16_t head;
        uint32_t oldIntState;
        int32_t status = SystemP_TIMEOUT;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            /* There's nothing available */
            if (vringObj->lastAvailIdx != vringObj->avail->idx)
            {
                head = vringObj->avail->ring[vringObj->lastAvailIdx % vringObj->vringNumBuf];
                vringObj->lastAvailIdx++;
    
                *vringBufId = head;
                status = SystemP_SUCCESS;
            }
            else
            {
                vringObj->used->flags &= (uint16_t)~VRING_USED_F_NO_NOTIFY;
            }
        }
        else
        {
            if (vringObj->lastUsedIdx != vringObj->used->idx)
            {
                head = (uint16_t)(vringObj->used->ring[vringObj->lastUsedIdx % vringObj->vringNumBuf].id);
                vringObj->lastUsedIdx++;
    
                *vringBufId = head;
    
                status = SystemP_SUCCESS;
            }
        }
    
        HwiP_restore(oldIntState);
    
        return status;
    }
    
    void RPMessage_vringPutEmptyRxBuf(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint32_t oldIntState;
        uint32_t rxMsgValue;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            struct vring_used_elem *used;
    
            used = &vringObj->used->ring[vringObj->used->idx % vringObj->vringNumBuf];
            used->id = vringBufId;
            used->len = vringObj->desc[vringBufId].len;
            vringObj->used->idx++;
    
            rxMsgValue = RPMESSAGE_LINUX_RX_VRING_ID; /* in case of linux this should be RX VRING ID */
        }
        else
        {
            uint16_t avail;
    
            avail = (uint16_t)(vringObj->avail->idx % vringObj->vringNumBuf);
            vringObj->avail->ring[avail] = vringBufId;
            vringObj->avail->idx++;
    
            rxMsgValue = RPMESSAGE_MSG_VRING_NEW_EMPTY;
        }
    
        #if defined(__aarch64__) || defined(__arm__)
        __asm__ __volatile__ ( "dsb sy"  "\n\t": : : "memory");
        __asm__ __volatile__ ( "isb sy"  "\n\t": : : "memory");
        #endif
        #if defined(_TMS320C6X)
        _mfence();
        _mfence();
        #endif
    
        HwiP_restore(oldIntState);
    
        IpcNotify_sendMsg(remoteCoreId,
            IPC_NOTIFY_CLIENT_ID_RPMSG,
            rxMsgValue,
            1 /* wait for message to be posted */
            );
    }
    
    uint32_t RPMessage_vringIsFullRxBuf(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint32_t isNewFullBuf = 1;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            if (vringObj->lastAvailIdx == vringObj->avail->idx)
            {
                isNewFullBuf = 0;
            }
        }
        else
        {
            if (vringObj->lastUsedIdx == vringObj->used->idx)
            {
                isNewFullBuf = 0;
            }
        }
    
        HwiP_restore(oldIntState);
    
        return isNewFullBuf;
    }
    
    uint8_t *RPMessage_vringGetRxBufAddr(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
    
        return (uint8_t *)vringObj->desc[vringBufId].addr;
    }
    
    uint32_t RPMessage_vringGetSize(uint16_t numBuf, uint16_t msgSize, uint32_t align)
    {
        return  RPMessage_align(
                    sizeof(struct vring_desc) * numBuf /* buffer descriptors for each buffer */
                  + sizeof(uint16_t) * (2 + numBuf)    /* avail queue */
                  , align
                )
                +
                RPMessage_align(
                      sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf /* used queue */
                    , align
                    )
                +
                numBuf * msgSize /* message buffers */
                ;
    }
    
    void RPMessage_vringResetInternal(RPMessage_Vring *vringObj, uint16_t numBuf, uint16_t msgSize,
        uintptr_t vringBaseAddr,
        uint32_t offset_desc,
        uint32_t offset_avail,
        uint32_t offset_used,
        uint32_t offset_buf,
        uint32_t isTx
    )
    {
        uint8_t *bufAddr;
        uint16_t bufId;
    
        /* intialize locally visible variables */
        vringObj->lastUsedIdx  = 0;
        vringObj->lastAvailIdx = 0;
        vringObj->vringNumBuf  = numBuf;
    
        /* set address to vring descriptors, avail Q, used Q, message buffers */
        vringObj->desc        = (struct vring_desc  *)(vringBaseAddr + offset_desc);
        vringObj->avail       = (struct vring_avail *)(vringBaseAddr + offset_avail);
        vringObj->used        = (struct vring_used  *)(vringBaseAddr + offset_used);
        vringObj->bufBaseAddr = (uint8_t            *)(vringBaseAddr + offset_buf);
    
        /* only initialize TX vring, RX vring is initialized by the remote core */
        if(isTx)
        {
            /* initialize descriptors with message buffer address and max len */
            bufAddr = vringObj->bufBaseAddr;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->desc[bufId].addr    = (uint32_t)bufAddr;
                vringObj->desc[bufId].padding = 0;
                vringObj->desc[bufId].len     = msgSize;
                vringObj->desc[bufId].flags   = 0;
                vringObj->desc[bufId].next    = 0;
                bufAddr += msgSize;
            }
            /* initialize avail Q and add all buffers to avail Q */
            vringObj->avail->idx = 0;
            vringObj->avail->flags = 0;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->avail->ring[bufId] = bufId;
                vringObj->avail->idx++;
            }
            /* initialize used Q */
            vringObj->used->idx = 0;
            vringObj->used->flags = 0;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->used->ring[bufId].id = 0;
                vringObj->used->ring[bufId].len = 0;
            }
        }
    }
    
    void RPMessage_vringReset(uint16_t remoteCoreId, uint16_t isTx, const RPMessage_Params *params)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj;
        uintptr_t vringBaseAddr;
        uint32_t offset_desc, offset_avail, offset_used, offset_buf;
        uint32_t align, vringSize;
        uint16_t numBuf, msgSize;
    
        if(isTx)
        {
            vringObj = &coreObj->vringTxObj;
            vringBaseAddr = params->vringTxBaseAddr[remoteCoreId];
        }
        else
        {
            vringObj = &coreObj->vringRxObj;
            vringBaseAddr = params->vringRxBaseAddr[remoteCoreId];
        }
        align            = sizeof(uint32_t);
        numBuf           = params->vringNumBuf;
        msgSize          = params->vringMsgSize;
    
        /* get vring size, including descriptors, avail Q, used Q, message buffers and alignment */
        vringSize = RPMessage_vringGetSize(numBuf, msgSize, align);
    
        /* check if vring ID is within limits of the memory available for vring */
        DebugP_assert( vringSize <= params->vringSize);
    
        /* calculate offset to vring descriptors, avail Q, used Q, message buffers
         * relative to vringBaseAddr
         */
        offset_desc  = 0;
        offset_avail = offset_desc  + sizeof(struct vring_desc) * numBuf;
        offset_used  = offset_avail + RPMessage_align( sizeof(uint16_t) * (2 + numBuf), align);
        offset_buf   = offset_used  + RPMessage_align( sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf, align);
    
        RPMessage_vringResetInternal(vringObj,
            numBuf, msgSize,
            vringBaseAddr,
            offset_desc, offset_avail, offset_used, offset_buf,
            isTx
            );
    }
    
    /* VRING reset for Linux+RTOS is different vs RTOS+RTOS.
     * This function has the logic to handle these differences.
     */
    void RPMessage_vringResetLinux(uint16_t remoteCoreId, uint16_t isTx, const RPMessage_ResourceTable *rscTable)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj;
        uintptr_t vringBaseAddr;
        uint32_t offset_desc, offset_avail, offset_used, offset_buf;
        uint32_t align;
        uint16_t numBuf, msgSize;
    
        if(isTx)
        {
            vringObj = &coreObj->vringTxObj;
            vringBaseAddr = rscTable->vring0.da;
            align         = rscTable->vring0.align;
            numBuf        = rscTable->vring0.num;
        }
        else
        {
            vringObj = &coreObj->vringRxObj;
            vringBaseAddr = rscTable->vring1.da;
            align         = rscTable->vring1.align;
            numBuf        = rscTable->vring1.num;
        }
    
        msgSize          = RPMESSAGE_LINUX_MSG_SIZE;
    
        /* calculate offset to vring descriptors, avail Q, used Q, message buffers
         * relative to vringBaseAddr
         */
        offset_desc  = 0;
        offset_avail = offset_desc  + sizeof(struct vring_desc) * numBuf;
        offset_used  = offset_avail + RPMessage_align( sizeof(uint16_t) * (2 + numBuf), align);
        offset_buf   = offset_used  + RPMessage_align( sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf, align);
        /* buffer offset is aligned to numBuf*msgSize*2, eg, 512*256*2 = 256KB after offset_used */
        offset_buf   = RPMessage_align( offset_buf, numBuf*msgSize*2);
    
        if(isTx)
        {
            /* offset_buf points to TX buffers already */
        }
        else
        {
            /* we dont really use offset buf for RX VRING but showing the calculation here for completeness
             * RX buffers are initialized by Linux side
             */
            offset_buf += numBuf*msgSize - (rscTable->vring1.da - rscTable->vring0.da);
        }
    
        RPMessage_vringResetInternal(vringObj,
            numBuf, msgSize,
            vringBaseAddr,
            offset_desc, offset_avail, offset_used, offset_buf,
            isTx
            );
    }
    

  • Hello,

    I went through code provided by you in this E2E.
    According to AM263X TRM : ".The sender must not initiate another message to the same receiver until the previously initiated mailbox interaction with the same receiver is complete."
    Can you recheck if your high burst rate follows this condition or not?

    Thanks,
    Gunjan

  • I assume this is in charge of the SDK "RPMessage_send" function, isn't it?

  • Hello Alessandro,

    Apologies for delay in response. I have tried to replicate your setup at my end.
    My changes:
    1. Copied your ipc_rpmsg_echo.c in all 4 cores at my end.
    2. Disabled ipc for core 1 and core 3 using sysconfig.
    3. RP Message Number of Buffers=16

    After these changes I was also getting stuck in "RPMessage_allocEndPtMsg return a null pointer" and was trying to know reason for it.
    But during this debugging I got to notice that core1 and core3 also tried to send messages to other cores using your ipc_rpmsg_echo.c's code. Then I saw that both these cores are assuming itself as core0 (default value of gIpcNotifyCtrl.selfCoreId=0) and 3 core acted as main_core in this case. To avoid this issue I have changed code for core1, core3 to not initiate message transfers.

    After disabling message transfers of core1 and core3, core 0 is trying to send messages to core 2 and it is getting this warning in terminal: "WARNING: RPMessage_send:294: [IPC RPMSG] Message send to remote core 2 @ 13 end point failed due to lack of space in vring !!!"

    My assumption is that it happening because burst rate is high. These are my observations, hope it helps.

    Thanks and Best Regards,
    Gunjan

  • Hi Gunjan,

    you are right, the WARNING message you see is expected and due to the high transfer rate. 

    In our code we have just two cores in a lockstep configuration so core 0_1 is in lockstep with core 0_0 and core 1_1 is in lockstep with core 1_0 so the problem is not caused by faulty configuration of the IPC. Moreover in the project of the modified example I give you I disabled at all the loading of core 0_1 an core 0_0 from the debug configuration.

    In the modified example, did you try to pause the cores and restart it again to create a little desynchronization (our code transfer data in a completely asynchronous way from one core to the other and viceversa)? Did you try to increase the rate?

    Regards

    Alessandro

  • Hello Alessandro,

    I was able to replicate this issue of infinite loop at my end.
    This seems to be a valid issue, and it happens when buffer gets full because of high burst rate. I have raised an internal jira bug to fix this. In current example whenever ipc message is received it is stored in buffer (if there is any space in buffer), and later application checks this buffer to handle messages. And when buffer is full then you are getting stuck in infinite while loop.
    For now, you can change RP Message Number of Buffers=8 using sysconfig.
    OR reduce burst rate.
    OR switch your application to callback mode by registering handler to be called upon receiving ipc message, instead of storing in vring buffer (for processing later) whenever message is received. You can refer below code and ipc_notify example.

    RPMessage_CreateParams_init(&createParams);
    createParams.localEndPt = gServerEndPt;
    createParams.recvCallback = echocallback;
    status = RPMessage_construct(&gServerMsgObject, &createParams);
    DebugP_assert(status==SystemP_SUCCESS);

    Thanks and Best Regards,
    Gunjan