This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

MCU-PLUS-SDK-AM243X: Does IpcNotify have a problem when messages are sent from both sides at once?

Part Number: MCU-PLUS-SDK-AM243X

Hello,

I changed our RPMessage-Ipc-Implementation from using a notify and then reading out the RPMessage-Buffer in a task context to an interrupt driven implementation which reads out the ipc-buffer as soon as the callback is called.

But here I faced two problems.

1.

when using a RPMessage_send from both cores at nearly the same time window with a timeout of 0 it happens that one of the two cores (seems to be related which one is started first) returns with a timeout. This comes from the function RPMessage_vringGetEmptyTxBuf which returns SystemP_FAILURE, which is the internal branch of /* There's nothing available */. While the other one can send normally. If I raise the timeout to ten I can at least handle the first packages but then this happens at a later point:

2.

This seems to be really strange, when looking into the function it hangs inside a loop which it would never return following the logic since the mailboxIsFull stays 1 when both cores stay in this loop. And this happens inside working on the interrupt that is issued because the other core did send a telegram:

Those are the variables in this case for RPMessage:

and for IpcNotify:

and a register snippet for the two cores for the mailbox-registers:

The MAILBOX-registers also show some more values in memory-view, not sure if this is important, since there are some bits set, not shown in the register-view:

This seems to be a deadlock-situation that happens when both cores send messages simultaeneously.

I changed to this implementation coming from the task-context-based-implementation, because it happened that one core was busy with other stuff while the other core already filled the vring-buffers and could not send anymore. I thought that may be a valid solution then but then those problems occured.

I will snip down to the sequences since there is some of our code around. Keep in mind we initialize the needed params with valid values in our own driver-implementation:

IpcNotify_Params notifyParams;
int32_t status;

/* initialize parameters to default */
IpcNotify_Params_init(&notifyParams);

/* specify the core on which this API is called */
notifyParams.selfCoreId = OWN_CORE_ID;
notifyParams.numCores = IPC_CORES - 1;

RPMessage_Params rpmsgParams;

/* initialize parameters to default */
RPMessage_Params_init(&rpmsgParams);    
/* initialize the IPC Notify module */
status = IpcNotify_init(&notifyParams);

rpmsgParams.vringSize = IPC_RPMESSAGE_VRING_SIZE;
rpmsgParams.vringNumBuf = IPC_RPMESSAGE_NUM_VRING_BUF;
rpmsgParams.vringMsgSize = IPC_RPMESSAGE_MAX_VRING_BUF_SIZE;

/* initialize the IPC RP Message module */
status = RPMessage_init(&rpmsgParams);

IpcNotify_syncAll(SystemP_WAIT_FOREVER);
/* create RPParams afterwards since it would overwrite the notify-sanc-callback */
RPMessage_CreateParams createParams;

RPMessage_CreateParams_init(&createParams);
createParams.recvCallback = &newSysMessageCbStatic;
createParams.recvCallbackArgs = reinterpret_cast<void*>(this);
createParams.localEndPt = SYSTEM_CHANNEL_ENDPT_ID;

RPMessage_construct(&systemChannelMsgObject_, &createParams);

// at other points in the program on both cores:
RPMessage_send(ptrToData, dataSize, TARGET_CORE_ID, SYSTEM_CHANNEL_ENDPT_ID, SYSTEM_CHANNEL_ENDPT_ID, 0) == SystemP_SUCCESS)

// the callback
void IpcDriver::newSysMessageCbStatic(RPMessage_Object *obj, void *arg, void *data, uint16_t dataLen, uint16_t remoteCoreId, uint16_t remoteEndPt)
{
    reinterpret_cast<IpcDriver*>(arg)->newSysMessageCb(data);
}

The callback gets called on the other core! So In my understanding it should use the packet, do its work inside the callback and then give the memory inside the buffer free again.

So I wouldn't see any problem having a whole interrupt-based solution since even if multiple cores are sending messages at the same time the RPMessage-implementation. But here it seems

we also ensured the interrupt-stacks have enough size:

Edit: Update:
I also tried inifite-timeout values the behaviour stays the same like in 2.

I also noticed that a warm reset does not reset the Ipc-related stuff correctly. I got this "hanging"-bug also at the IpcNotify_syncAll-function-call after a CPU-reset and load again of the images in CCS. Is this preventable? A clean reset would be a nice feature.

Setup:

We have a region in the linker defined for the IPC-shared memory. It start at 0x701D0000 in our case (IPC starts at 0x701D8000). The MPU is set accordingly to not cached but shared. We have our own IpcDriver but are using the RPMessage. We have ensured that the VRING is aallocated accordingly to what SysCfg would normally so. Without going too deep into detail: The whole situation once worked before without using the direct callbacks but the notify and then getting the packet in a task context. It first appeared as soon as I changed it to a direct callback.

Best regards

Felix

  • Hi Felix,

    Timeout implementation is missing when Notify_sendMsg() is called by RPMessage_send().

    Please replace the IPC RPMsg driver files (ipc_rpmsg.c, ipc_rpmsg_priv.h, ipc_rpmsg_vring.c), at "source/drivers/ipc_rpmsg" with below files.

    Can you check if this update helps to resolve the issue?

    ipc_rpmsg_priv.h

    /*
     *  Copyright (C) 2018-2021 Texas Instruments Incorporated
     *
     *  Redistribution and use in source and binary forms, with or without
     *  modification, are permitted provided that the following conditions
     *  are met:
     *
     *    Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     *    Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the
     *    distribution.
     *
     *    Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    #include <drivers/ipc_rpmsg/ipc_rpmsg_priv.h>
    
    /* NOTE:
     * For RTOS to RTOS IPC RPMessage
     *   - AVAIL in vring = EMPTY in ipc rpmsg implementation
     *   - USED in vring = FULL  in ipc rpmsg implementation
     *
     * For Linux to RTOS IPC RPMessage
     * - When doing TX from RTOS
     *   - AVAIL in vring = EMPTY in ipc rpmsg implementation
     *   - USED in vring = FULL  in ipc rpmsg implementation
     * - When doing RX from RTOS
     *   - AVAIL in vring = FULL in ipc rpmsg implementation
     *   - USED in vring = EMTPY  in ipc rpmsg implementation
     *
     * RPMessage_isLinuxCore is used in Rx APIs to switch the meaning when
     * receiving messages from core ID which runs Linux
     */
    
    #define VRING_USED_F_NO_NOTIFY  (1U)
    
    int32_t RPMessage_vringGetEmptyTxBuf(uint16_t remoteCoreId, uint16_t *vringBufId, uint32_t timeout)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        uint32_t oldIntState;
        uint16_t head;
        int32_t status = SystemP_FAILURE;
        uint32_t done = 0;
    
        oldIntState = HwiP_disable();
    
        do
        {
            /* There's nothing available */
            if (vringObj->lastAvailIdx == vringObj->avail->idx)
            {
                /* We need to know about added buffers */
                vringObj->used->flags &= (uint16_t)~VRING_USED_F_NO_NOTIFY;
    
                HwiP_restore(oldIntState);
    
                status = SemaphoreP_pend(&coreObj->newEmptyVringBufSem, timeout);
                if(status==SystemP_TIMEOUT)
                {
                    done = 1;
                }
    
                oldIntState = HwiP_disable();
            }
            else
            {
                head = vringObj->avail->ring[vringObj->lastAvailIdx % vringObj->vringNumBuf];
                vringObj->lastAvailIdx++;
    
                *vringBufId = head;
                done = 1;
                status = SystemP_SUCCESS;
            }
        } while( ! done );
    
        HwiP_restore(oldIntState);
    
        return status;
    }
    
    int32_t  RPMessage_vringPutFullTxBuf(uint16_t remoteCoreId, uint16_t vringBufId, uint16_t dataLen, uint32_t timeout)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        struct vring_used_elem *used;
        uint32_t oldIntState;
        uint32_t txMsgValue = RPMESSAGE_MSG_VRING_NEW_FULL;
        int32_t status = SystemP_FAILURE;
        uint32_t elapsedTicks, startTicks;
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            /* for linux we need to send the TX VRING ID in the mailbox message */
            txMsgValue = RPMESSAGE_LINUX_TX_VRING_ID;
        }
    
        oldIntState = HwiP_disable();
    
        used = &vringObj->used->ring[vringObj->used->idx % vringObj->vringNumBuf];
        used->id = vringBufId;
        used->len = dataLen;
        vringObj->used->idx++;
    
        #if defined(__aarch64__) || defined(__arm__)
        __asm__ __volatile__ ( "dsb sy"  "\n\t": : : "memory");
        __asm__ __volatile__ ( "isb sy"  "\n\t": : : "memory");
        #endif
        #if defined(_TMS320C6X)
        _mfence();
        _mfence();
        #endif
    
        HwiP_restore(oldIntState);
    
        startTicks = ClockP_getTicks();
        do
        {
            status = IpcNotify_sendMsg(remoteCoreId, IPC_NOTIFY_CLIENT_ID_RPMSG, txMsgValue, 0);
            elapsedTicks = ClockP_getTicks() - startTicks;
        } while((elapsedTicks < timeout) && (status == SystemP_TIMEOUT));
    
        if(elapsedTicks >= timeout)
        {
            status = SystemP_TIMEOUT;
        }
    
        return status;
    }
    
    void RPMessage_vringCheckEmptyTxBuf(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
        uint32_t isNewEmptyBuf = 1;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        if (vringObj->lastAvailIdx == vringObj->avail->idx)
        {
            isNewEmptyBuf = 0;
        }
    
        HwiP_restore(oldIntState);
    
        if(isNewEmptyBuf)
        {
            SemaphoreP_post(&coreObj->newEmptyVringBufSem);
        }
    }
    
    uint8_t *RPMessage_vringGetTxBufAddr(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
    
        return (uint8_t *)vringObj->desc[vringBufId].addr;
    }
    
    uint32_t RPMessage_vringGetTxBufLen(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringTxObj;
    
        return vringObj->desc[vringBufId].len;
    }
    
    int32_t RPMessage_vringGetFullRxBuf(uint16_t remoteCoreId, uint16_t *vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint16_t head;
        uint32_t oldIntState;
        int32_t status = SystemP_TIMEOUT;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            /* There's nothing available */
            if (vringObj->lastAvailIdx != vringObj->avail->idx)
            {
                head = vringObj->avail->ring[vringObj->lastAvailIdx % vringObj->vringNumBuf];
                vringObj->lastAvailIdx++;
    
                *vringBufId = head;
                status = SystemP_SUCCESS;
            }
            else
            {
                vringObj->used->flags &= (uint16_t)~VRING_USED_F_NO_NOTIFY;
            }
        }
        else
        {
            if (vringObj->lastUsedIdx != vringObj->used->idx)
            {
                head = (uint16_t)(vringObj->used->ring[vringObj->lastUsedIdx % vringObj->vringNumBuf].id);
                vringObj->lastUsedIdx++;
    
                *vringBufId = head;
    
                status = SystemP_SUCCESS;
            }
        }
    
        HwiP_restore(oldIntState);
    
        return status;
    }
    
    void RPMessage_vringPutEmptyRxBuf(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint32_t oldIntState;
        uint32_t rxMsgValue;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            struct vring_used_elem *used;
    
            used = &vringObj->used->ring[vringObj->used->idx % vringObj->vringNumBuf];
            used->id = vringBufId;
            used->len = vringObj->desc[vringBufId].len;
            vringObj->used->idx++;
    
            rxMsgValue = RPMESSAGE_LINUX_RX_VRING_ID; /* in case of linux this should be RX VRING ID */
        }
        else
        {
            uint16_t avail;
    
            avail = (uint16_t)(vringObj->avail->idx % vringObj->vringNumBuf);
            vringObj->avail->ring[avail] = vringBufId;
            vringObj->avail->idx++;
    
            rxMsgValue = RPMESSAGE_MSG_VRING_NEW_EMPTY;
        }
    
        #if defined(__aarch64__) || defined(__arm__)
        __asm__ __volatile__ ( "dsb sy"  "\n\t": : : "memory");
        __asm__ __volatile__ ( "isb sy"  "\n\t": : : "memory");
        #endif
        #if defined(_TMS320C6X)
        _mfence();
        _mfence();
        #endif
    
        HwiP_restore(oldIntState);
    
        IpcNotify_sendMsg(remoteCoreId,
            IPC_NOTIFY_CLIENT_ID_RPMSG,
            rxMsgValue,
            1 /* wait for message to be posted */
            );
    }
    
    uint32_t RPMessage_vringIsFullRxBuf(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
        uint32_t isNewFullBuf = 1;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        if(RPMessage_isLinuxCore(remoteCoreId))
        {
            if (vringObj->lastAvailIdx == vringObj->avail->idx)
            {
                isNewFullBuf = 0;
            }
        }
        else
        {
            if (vringObj->lastUsedIdx == vringObj->used->idx)
            {
                isNewFullBuf = 0;
            }
        }
    
        HwiP_restore(oldIntState);
    
        return isNewFullBuf;
    }
    
    uint8_t *RPMessage_vringGetRxBufAddr(uint16_t remoteCoreId, uint16_t vringBufId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj = &coreObj->vringRxObj;
    
        return (uint8_t *)vringObj->desc[vringBufId].addr;
    }
    
    uint32_t RPMessage_vringGetSize(uint16_t numBuf, uint16_t msgSize, uint32_t align)
    {
        return  RPMessage_align(
                    sizeof(struct vring_desc) * numBuf /* buffer descriptors for each buffer */
                  + sizeof(uint16_t) * (2 + numBuf)    /* avail queue */
                  , align
                )
                +
                RPMessage_align(
                      sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf /* used queue */
                    , align
                    )
                +
                numBuf * msgSize /* message buffers */
                ;
    }
    
    void RPMessage_vringResetInternal(RPMessage_Vring *vringObj, uint16_t numBuf, uint16_t msgSize,
        uintptr_t vringBaseAddr,
        uint32_t offset_desc,
        uint32_t offset_avail,
        uint32_t offset_used,
        uint32_t offset_buf,
        uint32_t isTx
    )
    {
        uint8_t *bufAddr;
        uint16_t bufId;
    
        /* intialize locally visible variables */
        vringObj->lastUsedIdx  = 0;
        vringObj->lastAvailIdx = 0;
        vringObj->vringNumBuf  = numBuf;
    
        /* set address to vring descriptors, avail Q, used Q, message buffers */
        vringObj->desc        = (struct vring_desc  *)(vringBaseAddr + offset_desc);
        vringObj->avail       = (struct vring_avail *)(vringBaseAddr + offset_avail);
        vringObj->used        = (struct vring_used  *)(vringBaseAddr + offset_used);
        vringObj->bufBaseAddr = (uint8_t            *)(vringBaseAddr + offset_buf);
    
        /* only initialize TX vring, RX vring is initialized by the remote core */
        if(isTx)
        {
            /* initialize descriptors with message buffer address and max len */
            bufAddr = vringObj->bufBaseAddr;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->desc[bufId].addr    = (uint32_t)bufAddr;
                vringObj->desc[bufId].padding = 0;
                vringObj->desc[bufId].len     = msgSize;
                vringObj->desc[bufId].flags   = 0;
                vringObj->desc[bufId].next    = 0;
                bufAddr += msgSize;
            }
            /* initialize avail Q and add all buffers to avail Q */
            vringObj->avail->idx = 0;
            vringObj->avail->flags = 0;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->avail->ring[bufId] = bufId;
                vringObj->avail->idx++;
            }
            /* initialize used Q */
            vringObj->used->idx = 0;
            vringObj->used->flags = 0;
            for(bufId=0; bufId<numBuf; bufId++)
            {
                vringObj->used->ring[bufId].id = 0;
                vringObj->used->ring[bufId].len = 0;
            }
        }
    }
    
    void RPMessage_vringReset(uint16_t remoteCoreId, uint16_t isTx, const RPMessage_Params *params)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj;
        uintptr_t vringBaseAddr;
        uint32_t offset_desc, offset_avail, offset_used, offset_buf;
        uint32_t align, vringSize;
        uint16_t numBuf, msgSize;
    
        if(isTx)
        {
            vringObj = &coreObj->vringTxObj;
            vringBaseAddr = params->vringTxBaseAddr[remoteCoreId];
        }
        else
        {
            vringObj = &coreObj->vringRxObj;
            vringBaseAddr = params->vringRxBaseAddr[remoteCoreId];
        }
        align            = sizeof(uint32_t);
        numBuf           = params->vringNumBuf;
        msgSize          = params->vringMsgSize;
    
        /* get vring size, including descriptors, avail Q, used Q, message buffers and alignment */
        vringSize = RPMessage_vringGetSize(numBuf, msgSize, align);
    
        /* check if vring ID is within limits of the memory available for vring */
        DebugP_assert( vringSize <= params->vringSize);
    
        /* calculate offset to vring descriptors, avail Q, used Q, message buffers
         * relative to vringBaseAddr
         */
        offset_desc  = 0;
        offset_avail = offset_desc  + sizeof(struct vring_desc) * numBuf;
        offset_used  = offset_avail + RPMessage_align( sizeof(uint16_t) * (2 + numBuf), align);
        offset_buf   = offset_used  + RPMessage_align( sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf, align);
    
        RPMessage_vringResetInternal(vringObj,
            numBuf, msgSize,
            vringBaseAddr,
            offset_desc, offset_avail, offset_used, offset_buf,
            isTx
            );
    }
    
    /* VRING reset for Linux+RTOS is different vs RTOS+RTOS.
     * This function has the logic to handle these differences.
     */
    void RPMessage_vringResetLinux(uint16_t remoteCoreId, uint16_t isTx, const RPMessage_ResourceTable *rscTable)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_Vring *vringObj;
        uintptr_t vringBaseAddr;
        uint32_t offset_desc, offset_avail, offset_used, offset_buf;
        uint32_t align;
        uint16_t numBuf, msgSize;
    
        if(isTx)
        {
            vringObj = &coreObj->vringTxObj;
            vringBaseAddr = rscTable->vring0.da;
            align         = rscTable->vring0.align;
            numBuf        = rscTable->vring0.num;
        }
        else
        {
            vringObj = &coreObj->vringRxObj;
            vringBaseAddr = rscTable->vring1.da;
            align         = rscTable->vring1.align;
            numBuf        = rscTable->vring1.num;
        }
    
        msgSize          = RPMESSAGE_LINUX_MSG_SIZE;
    
        /* calculate offset to vring descriptors, avail Q, used Q, message buffers
         * relative to vringBaseAddr
         */
        offset_desc  = 0;
        offset_avail = offset_desc  + sizeof(struct vring_desc) * numBuf;
        offset_used  = offset_avail + RPMessage_align( sizeof(uint16_t) * (2 + numBuf), align);
        offset_buf   = offset_used  + RPMessage_align( sizeof(uint16_t) * 2 + sizeof(struct vring_used_elem) * numBuf, align);
        /* buffer offset is aligned to numBuf*msgSize*2, eg, 512*256*2 = 256KB after offset_used */
        offset_buf   = RPMessage_align( offset_buf, numBuf*msgSize*2);
    
        if(isTx)
        {
            /* offset_buf points to TX buffers already */
        }
        else
        {
            /* we dont really use offset buf for RX VRING but showing the calculation here for completeness
             * RX buffers are initialized by Linux side
             */
            offset_buf += numBuf*msgSize - (rscTable->vring1.da - rscTable->vring0.da);
        }
    
        RPMessage_vringResetInternal(vringObj,
            numBuf, msgSize,
            vringBaseAddr,
            offset_desc, offset_avail, offset_used, offset_buf,
            isTx
            );
    }
    
    /*
     *  Copyright (C) 2018-2021 Texas Instruments Incorporated
     *
     *  Redistribution and use in source and binary forms, with or without
     *  modification, are permitted provided that the following conditions
     *  are met:
     *
     *    Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
     *
     *    Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the
     *    distribution.
     *
     *    Neither the name of Texas Instruments Incorporated nor the names of
     *    its contributors may be used to endorse or promote products derived
     *    from this software without specific prior written permission.
     *
     *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     */
    
    #include <drivers/ipc_rpmsg/ipc_rpmsg_priv.h>
    
    IpcRpmsg_Ctrl gIpcRpmsgCtrl;
    
    RPMessage_LocalMsg *RPMessage_allocEndPtMsg(uint32_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        RPMessage_LocalMsg *pMsg;
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
        pMsg = (RPMessage_LocalMsg*)RPMessage_queueGet(&coreObj->freeQ);
        if(pMsg == NULL)
        {
            coreObj->freeQAllocPending = 1;
        }
        else
        {
            coreObj->freeQAllocPending = 0;
        }
        HwiP_restore(oldIntState);
    
        return pMsg;
    }
    
    uint32_t RPMessage_freeEndPtMsg(uint16_t remoteCoreId, RPMessage_LocalMsg *pMsg)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        uint32_t oldIntState, isAllocPending;
    
        oldIntState = HwiP_disable();
        isAllocPending = coreObj->freeQAllocPending;
        RPMessage_queuePut(&coreObj->freeQ, &pMsg->elem);
        HwiP_restore(oldIntState);
    
        return isAllocPending;
    }
    
    void RPMessage_putEndPtMsg(RPMessage_Struct *obj, RPMessage_LocalMsg *pMsg)
    {
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
        RPMessage_queuePut(&obj->endPtQ, &pMsg->elem);
        HwiP_restore(oldIntState);
    
        SemaphoreP_post(&obj->newEndPtMsgSem);
    }
    
    int32_t RPMessage_getEndPtMsg(RPMessage_Struct *obj, RPMessage_LocalMsg **pMsg, uint32_t timeout)
    {
        uint32_t oldIntState, done;
        int32_t status = SystemP_TIMEOUT;
    
        done = 0;
        do {
            oldIntState = HwiP_disable();
            *pMsg = (RPMessage_LocalMsg*)RPMessage_queueGet(&obj->endPtQ);
            HwiP_restore(oldIntState);
    
            if(*pMsg==NULL)
            {
                status = SemaphoreP_pend(&obj->newEndPtMsgSem, timeout);
                if(status == SystemP_TIMEOUT)
                {
                    done = 1;
                }
                if(status == SystemP_SUCCESS && obj->doRecvUnblock)
                {
                    status = SystemP_TIMEOUT;
                    done = 1;
                }
            }
            else
            {
                status = SystemP_SUCCESS;
                done = 1;
            }
        } while( ! done );
    
        return status;
    }
    
    /* handle one new received message from vring */
    void RPMessage_recvHandler(uint32_t remoteCoreId)
    {
        uint16_t vringBufId;
        int32_t status;
        RPMessage_LocalMsg *pMsg;
    
        /* get a free message pointer to hold vring buf info
         * if no free message pointer then dont remove from vring
         */
        pMsg = RPMessage_allocEndPtMsg(remoteCoreId);
        if(pMsg!=NULL)
        {
            status = RPMessage_vringGetFullRxBuf(remoteCoreId, &vringBufId);
            if(status == SystemP_SUCCESS)
            {
                /* message in vring, extract it and copy info to message pointer and put in end point Q */
                uint8_t *vringBufAddr = RPMessage_vringGetRxBufAddr(remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
                uint16_t localEndPt = header->dstEndPt;
    
                status = SystemP_FAILURE;
                if(localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT)
                {
                    RPMessage_Struct *obj = gIpcRpmsgCtrl.localEndPtObj[localEndPt];
                    if(obj!=NULL)
                    {
                        if(obj->recvCallback != NULL)
                        {
                            /* recv messages handled in callback mode */
                            obj->recvCallback( (RPMessage_Object*)obj,
                                obj->recvCallbackArgs,
                                &vringBufAddr[sizeof(RPMessage_Header)],
                                header->dataLen,
                                remoteCoreId,
                                header->srcEndPt
                                );
                            status = SystemP_SUCCESS;
    
                            /* pMsg is not used, free it */
                            RPMessage_freeEndPtMsg(remoteCoreId, pMsg);
                            /* done using vring buf, free it */
                            RPMessage_vringPutEmptyRxBuf(remoteCoreId, vringBufId);
                        }
                        else
                        {
                            /* recv messages handled in non-callback mode */
                            pMsg->remoteCoreId = remoteCoreId;
                            pMsg->vringBufId = vringBufId;
                            RPMessage_putEndPtMsg(obj, pMsg);
                            status = SystemP_SUCCESS;
    
                            if(obj->recvNotifyCallback!=NULL)
                            {
                                obj->recvNotifyCallback((RPMessage_Object*)obj, obj->recvNotifyCallbackArgs);
                            }
                        }
                    }
                }
                if(status!=SystemP_SUCCESS)
                {
                    /* invalid vring message header or invalid endpt
                    * or no object registered for local end pt, so no need handle the message pointer,
                    * free it
                    */
                    RPMessage_vringPutEmptyRxBuf(remoteCoreId, vringBufId);
                }
            }
            if(status!=SystemP_SUCCESS)
            {
                /* no message in vring or invalid vring message header or invalid endpt
                * or no object registered for local end pt, so no need handle the message pointer,
                * free it
                */
                RPMessage_freeEndPtMsg(remoteCoreId, pMsg);
            }
        }
    }
    
    void RPMessage_notifyCallback(uint32_t remoteCoreId, uint16_t localClientId, uint32_t msgValue, void *args)
    {
        if(gIpcRpmsgCtrl.isCoreEnable[remoteCoreId] && gIpcRpmsgCtrl.isCoreInitialized[remoteCoreId])
        {
            uint16_t rxMsgValue = RPMESSAGE_MSG_VRING_NEW_FULL;
    
            if(RPMessage_isLinuxCore(remoteCoreId))
            {
                rxMsgValue = RPMESSAGE_LINUX_RX_VRING_ID; /* In linux, we get RX VRING ID, which is 1 in linux */
            }
            if(msgValue == rxMsgValue)
            {   /* check full ring */
                while(RPMessage_vringIsFullRxBuf(remoteCoreId))
                {
                    RPMessage_recvHandler(remoteCoreId);
                }
            }
            else
            {   /* check empty ring */
    
                /* check if there is any new empty buf, if yes then post semaphore to wake up any waiting threads */
                RPMessage_vringCheckEmptyTxBuf(remoteCoreId);
            }
        }
    }
    
    int32_t RPMessage_send( void*    data,
                            uint16_t dataLen,
                            uint16_t remoteCoreId,
                            uint16_t remoteEndPt,
                            uint16_t localEndPt,
                            uint32_t timeout
                          )
    {
        int32_t status = SystemP_FAILURE;
    
        if(remoteCoreId < CSL_CORE_ID_MAX && gIpcRpmsgCtrl.isCoreEnable[remoteCoreId]
            && data != NULL && dataLen != 0
            )
        {
            uint16_t vringBufId;
    
            status = RPMessage_vringGetEmptyTxBuf(remoteCoreId, &vringBufId, timeout);
            if(status == SystemP_SUCCESS)
            {
                uint8_t *vringBufAddr = RPMessage_vringGetTxBufAddr(remoteCoreId, vringBufId);
                uint16_t vringBufLen = RPMessage_vringGetTxBufLen(remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
    
                if(dataLen > (vringBufLen - sizeof(RPMessage_Header)) )
                {
                    dataLen = vringBufLen - sizeof(RPMessage_Header);
    
                    DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point truncated due to lack of space in vring buffer !!!\r\n",
                        remoteCoreId, remoteEndPt);
                }
    
                header->srcEndPt = localEndPt;
                header->dstEndPt = remoteEndPt;
                header->srcCoreId = gIpcRpmsgCtrl.selfCoreId;
                header->flags = 0;
                header->dataLen = dataLen;
    
                memcpy( &vringBufAddr[sizeof(RPMessage_Header)], data, dataLen);
    
                status = RPMessage_vringPutFullTxBuf(remoteCoreId, vringBufId, dataLen + sizeof(RPMessage_Header), timeout);
    
                if(status != SystemP_SUCCESS)
                {
                    DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to lack of space in Notify Queue !!!\r\n",
                    remoteCoreId, remoteEndPt);
                }
            }
            else
            {
                DebugP_logWarn("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to lack of space in vring !!!\r\n",
                    remoteCoreId, remoteEndPt);
            }
        }
        else
        {
            DebugP_logError("[IPC RPMSG] Message send to remote core %d @ %d end point failed due to invalid parameters !!!\r\n",
                remoteCoreId, remoteEndPt
                );
        }
        return status;
    }
    
    int32_t RPMessage_recv(RPMessage_Object *handle, void* data, uint16_t *dataLen,
                          uint16_t *remoteCoreId, uint16_t *remoteEndPt, uint32_t timeout)
    {
        int32_t status = SystemP_FAILURE;
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        if( data != NULL && dataLen != NULL && remoteCoreId != NULL && remoteEndPt != NULL
            && obj->recvCallback == NULL /* i.e non-callback mode */
          )
        {
            RPMessage_LocalMsg *pMsg;
    
            status = RPMessage_getEndPtMsg(obj, &pMsg, timeout);
            if(status == SystemP_SUCCESS && pMsg != NULL)
            {
                uint32_t isAllocPending = 0;
                uint16_t vringBufId = pMsg->vringBufId;
                uint8_t *vringBufAddr = RPMessage_vringGetRxBufAddr(pMsg->remoteCoreId, vringBufId);
                RPMessage_Header *header = (RPMessage_Header *)vringBufAddr;
    
                *remoteCoreId = pMsg->remoteCoreId;
                *remoteEndPt =  header->srcEndPt;
    
                if( *dataLen < header->dataLen )
                {
                    DebugP_logWarn("[IPC RPMSG] Message recv @ %d local end point truncated due to insufficient user buffer size !!!\r\n",
                        obj->localEndPt
                        );
                }
                else
                {
                    *dataLen = header->dataLen;
                }
    
                memcpy( data, &vringBufAddr[sizeof(RPMessage_Header)], *dataLen);
    
                RPMessage_vringPutEmptyRxBuf(*remoteCoreId, vringBufId);
                isAllocPending = RPMessage_freeEndPtMsg(*remoteCoreId, pMsg);
                if(isAllocPending)
                {   /* if any messages are pending message pointer due to free Q being empty,
                     * now there will be atleast one element to handle any pending vring requests.
                     * So check vring and handle pending messages if any
                     */
                    RPMessage_notifyCallback(*remoteCoreId,
                        IPC_NOTIFY_CLIENT_ID_RPMSG,
                        RPMESSAGE_MSG_VRING_NEW_FULL,
                        NULL);
                }
            }
            else
            {
                if(status != SystemP_TIMEOUT)
                {
                    DebugP_logError("[IPC RPMSG] Message recv @ %d local end point failed due to invalid end point Q !!!\r\n",
                        obj->localEndPt
                        );
                }
            }
        }
        else
        {
            DebugP_logError("[IPC RPMSG] Message recv @ %d local end point failed due to invalid parameters !!!\r\n",
                obj->localEndPt
                );
        }
        return status;
    }
    
    void RPMessage_unblock(RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        obj->doRecvUnblock = 1;
        SemaphoreP_post(&obj->newEndPtMsgSem);
    }
    
    uint16_t RPMessage_getLocalEndPt(const RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        return obj->localEndPt;
    }
    
    int32_t RPMessage_construct(RPMessage_Object *handle, const RPMessage_CreateParams *createParams)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
        int32_t status = SystemP_FAILURE;
    
        DebugP_assert(sizeof(RPMessage_Object) >= sizeof(RPMessage_Struct));
    
        if(createParams->localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT
            && gIpcRpmsgCtrl.localEndPtObj[createParams->localEndPt] == NULL)
        {
            obj->localEndPt = createParams->localEndPt;
            obj->recvCallback = createParams->recvCallback;
            obj->recvCallbackArgs = createParams->recvCallbackArgs;
            obj->recvNotifyCallback = createParams->recvNotifyCallback;
            obj->recvNotifyCallbackArgs = createParams->recvNotifyCallbackArgs;
            obj->doRecvUnblock = 0;
            RPMessage_queueReset(&obj->endPtQ);
            SemaphoreP_constructBinary(&obj->newEndPtMsgSem, 0);
    
            gIpcRpmsgCtrl.localEndPtObj[createParams->localEndPt] = obj;
    
            status = SystemP_SUCCESS;
        }
        return status;
    }
    
    void RPMessage_destruct(RPMessage_Object *handle)
    {
        RPMessage_Struct *obj = (RPMessage_Struct *)handle;
    
        if(obj->localEndPt < RPMESSAGE_MAX_LOCAL_ENDPT &&
            gIpcRpmsgCtrl.localEndPtObj[obj->localEndPt] != NULL)
        {
            gIpcRpmsgCtrl.localEndPtObj[obj->localEndPt] = NULL;
    
            obj->localEndPt = RPMESSAGE_MAX_LOCAL_ENDPT;
            obj->recvCallback = NULL;
            obj->recvCallbackArgs = NULL;
            obj->doRecvUnblock = 0;
            RPMessage_queueReset(&obj->endPtQ);
            SemaphoreP_destruct(&obj->newEndPtMsgSem);
        }
    }
    
    void RPMessage_CreateParams_init(RPMessage_CreateParams *params)
    {
        params->localEndPt = RPMESSAGE_MAX_LOCAL_ENDPT;
        params->recvCallback = NULL;
        params->recvCallbackArgs = NULL;
        params->recvNotifyCallback = NULL;
        params->recvNotifyCallbackArgs = NULL;
    }
    
    void RPMessage_Params_init(RPMessage_Params *params)
    {
        uint16_t coreId;
    
        memset(params, 0, sizeof(RPMessage_Params));
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            params->vringTxBaseAddr[coreId] = RPMESSAGE_VRING_ADDR_INVALID;
            params->vringRxBaseAddr[coreId] = RPMESSAGE_VRING_ADDR_INVALID;
        }
        params->vringNumBuf = 8;
        params->vringMsgSize = 128;
        params->vringSize = RPMESSAGE_VRING_SIZE(params->vringNumBuf, params->vringMsgSize);
        params->linuxCoreId = CSL_CORE_ID_MAX;
        params->linuxResourceTable = NULL;
    }
    
    int32_t  RPMessage_coreInit(uint16_t remoteCoreId, const RPMessage_Params *params)
    {
        int32_t status = SystemP_SUCCESS;
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
        uint16_t elemId;
    
        SemaphoreP_constructBinary(&coreObj->newEmptyVringBufSem, 0);
        coreObj->freeQAllocPending = 0;
        RPMessage_queueReset(&coreObj->freeQ);
        for(elemId=0; elemId<RPMESSAGE_MAX_LOCAL_MSG_OBJ; elemId++)
        {
            RPMessage_queuePut(&coreObj->freeQ, &coreObj->localMsgObj[elemId].elem);
        }
        /* Linux VRINGs we will init later inside RPMessage_waitForLinuxReady() */
        if(gIpcRpmsgCtrl.isCoreEnable[remoteCoreId] && !RPMessage_isLinuxCore(remoteCoreId))
        {
            /* reset RX ring */
            RPMessage_vringReset(remoteCoreId, 0, params);
            /* reset TX ring */
            RPMessage_vringReset(remoteCoreId, 1, params);
    
            /* mark core data structure as initialized, now we can handle interrupts */
            gIpcRpmsgCtrl.isCoreInitialized[remoteCoreId] = 1;
        }
        return status;
    }
    
    void RPMessage_coreDeInit(uint16_t remoteCoreId)
    {
        RPMessage_Core *coreObj = &gIpcRpmsgCtrl.coreObj[remoteCoreId];
    
        SemaphoreP_destruct(&coreObj->newEmptyVringBufSem);
        coreObj->freeQAllocPending = 0;
        RPMessage_queueReset(&coreObj->freeQ);
    }
    
    void RPMessage_forceRecvMsgHandlers(void)
    {
        uint16_t coreId;
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            RPMessage_notifyCallback(coreId,
                IPC_NOTIFY_CLIENT_ID_RPMSG,
                RPMESSAGE_MSG_VRING_NEW_FULL,
                NULL);
        }
    }
    
    void RPMessage_controlEndPtHandler(RPMessage_Object *obj, void *arg,
            void *data, uint16_t dataLen,
            uint16_t remoteCoreId, uint16_t remoteEndPt)
    {
        if(gIpcRpmsgCtrl.controlEndPtCallback)
        {
            /* check if message is of correct size */
            if(dataLen == sizeof(RPMessage_AnnounceMsg))
            {
                /* invoke user callback */
                RPMessage_AnnounceMsg *pMsg = (RPMessage_AnnounceMsg*)data;
    
                gIpcRpmsgCtrl.controlEndPtCallback(
                    gIpcRpmsgCtrl.controlEndPtCallbackArgs,
                    remoteCoreId,
                    pMsg->remoteEndPt,
                    pMsg->name
                    );
            }
        }
    }
    
    int32_t RPMessage_controlEndPtInit(void)
    {
        RPMessage_CreateParams createPrms;
        int32_t status;
    
        RPMessage_CreateParams_init(&createPrms);
        createPrms.localEndPt = RPMESSAGE_CTRL_ENDPOINT_ID;
        createPrms.recvCallback = RPMessage_controlEndPtHandler;
        status = RPMessage_construct(&gIpcRpmsgCtrl.controlEndPtObj, &createPrms);
    
        return status;
    }
    
    void RPMessage_controlEndPtDeInit(void)
    {
        RPMessage_destruct(&gIpcRpmsgCtrl.controlEndPtObj);
    }
    
    int32_t  RPMessage_announce(uint16_t remoteCoreId, uint16_t localEndPt, const char* name)
    {
        int32_t status;
        RPMessage_AnnounceMsg msg;
    
        msg.type = 0;
        msg.remoteEndPt = localEndPt; /* local end point will be remote end point for the other side */
        strncpy(msg.name, name, RPMESSAGE_ANNOUNCE_SERVICENAME_LEN-1);
        msg.name[RPMESSAGE_ANNOUNCE_SERVICENAME_LEN-1] = '\0';
    
        status = RPMessage_send(
                    &msg,
                    sizeof(RPMessage_AnnounceMsg),
                    remoteCoreId,
                    RPMESSAGE_CTRL_ENDPOINT_ID, /* control end point on remote side */
                    RPMESSAGE_CTRL_ENDPOINT_ID, /* reply or local end point, set also to control end point */
                    SystemP_WAIT_FOREVER /* wait until message is put in VRING */
        );
        return status;
    }
    
    void RPMessage_controlEndPtCallback(RPMessage_ControlEndPtCallback controlEndPtCallback,
        void  *controlEndPtCallbackArgs)
    {
        uint32_t oldIntState;
    
        oldIntState = HwiP_disable();
    
        gIpcRpmsgCtrl.controlEndPtCallback = controlEndPtCallback;
        gIpcRpmsgCtrl.controlEndPtCallbackArgs = controlEndPtCallbackArgs;
    
        HwiP_restore(oldIntState);
    }
    
    uint32_t RPMessage_isLinuxCore(uint16_t coreId)
    {
        uint32_t isLinuxCore = 0;
    
        if(coreId == gIpcRpmsgCtrl.linuxCoreId && gIpcRpmsgCtrl.linuxResourceTable)
        {
            isLinuxCore = 1;
        }
        return isLinuxCore;
    }
    
    int32_t  RPMessage_waitForLinuxReady(uint32_t timeout)
    {
        int32_t status = SystemP_FAILURE;
        volatile RPMessage_ResourceTable *rscTable = (RPMessage_ResourceTable *)gIpcRpmsgCtrl.linuxResourceTable;
    
        if(rscTable)
        {
            uint32_t elaspedTicks, startTicks = ClockP_getTicks();
            do
            {
                CacheP_inv((void*)rscTable, sizeof(RPMessage_ResourceTable), CacheP_TYPE_ALL);
                if(rscTable->vdev.status == 0x7U)
                {
                    /* linux has initialized the resource table, break out */
                    status = SystemP_SUCCESS;
                }
                if(status != SystemP_SUCCESS)
                {
                    elaspedTicks = ClockP_getTicks() - startTicks;
                    if( elaspedTicks >= timeout)
                    {
                        /* timeout, linux did not init the resource table in user specific timeout time */
                        status = SystemP_TIMEOUT;
                    }
                    if(status != SystemP_TIMEOUT)
                    {
                        /* sleep one tick */
                        ClockP_usleep(ClockP_ticksToUsec(1));
                    }
                }
            } while(status == SystemP_FAILURE);
    
            if(status == SystemP_SUCCESS)
            {
                /* init virtio on linux side */
    
                /* initialize RX VRING */
                RPMessage_vringResetLinux(
                        gIpcRpmsgCtrl.linuxCoreId,
                        0,
                        gIpcRpmsgCtrl.linuxResourceTable);
                /* initialize TX VRING */
                RPMessage_vringResetLinux(
                        gIpcRpmsgCtrl.linuxCoreId,
                        1,
                        gIpcRpmsgCtrl.linuxResourceTable);
    
                /* mark core data structure as initialized, now we can handle interrupts */
                gIpcRpmsgCtrl.isCoreInitialized[gIpcRpmsgCtrl.linuxCoreId] = 1;
            }
        }
    
        return status;
    }
    
    int32_t  RPMessage_init(const RPMessage_Params *params)
    {
        int32_t status = SystemP_SUCCESS;
        uint16_t coreId, localEndPtId;
    
        gIpcRpmsgCtrl.selfCoreId = IpcNotify_getSelfCoreId();
        gIpcRpmsgCtrl.controlEndPtCallback = NULL;
        gIpcRpmsgCtrl.controlEndPtCallbackArgs = NULL;
        gIpcRpmsgCtrl.linuxResourceTable = params->linuxResourceTable;
        gIpcRpmsgCtrl.linuxCoreId = params->linuxCoreId;
        for(localEndPtId = 0; localEndPtId < RPMESSAGE_MAX_LOCAL_ENDPT; localEndPtId++)
        {
            gIpcRpmsgCtrl.localEndPtObj[localEndPtId] = NULL;
        }
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            /* enable a core for RPMessage only when below is satisifed
             * - valid vring ID is set
             * - not self core ID
             * - IPC Notify with that core is enabled
             */
            gIpcRpmsgCtrl.isCoreEnable[coreId] = 0;
            gIpcRpmsgCtrl.isCoreInitialized[coreId] = 0;
            if(params->vringTxBaseAddr[coreId] != RPMESSAGE_VRING_ADDR_INVALID
                &&
                params->vringRxBaseAddr[coreId] != RPMESSAGE_VRING_ADDR_INVALID
                &&
                coreId != gIpcRpmsgCtrl.selfCoreId
                &&
                IpcNotify_isCoreEnabled(coreId)
              )
            {
                gIpcRpmsgCtrl.isCoreEnable[coreId] = 1;
            }
            if(RPMessage_isLinuxCore(coreId)
                && IpcNotify_isCoreEnabled(coreId)
                )
            {
                gIpcRpmsgCtrl.isCoreEnable[coreId] = 1;
            }
    
        }
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            status |= RPMessage_coreInit(coreId, params);
        }
    
        /* create control end point */
        status |= RPMessage_controlEndPtInit();
    
        IpcNotify_registerClient(IPC_NOTIFY_CLIENT_ID_RPMSG,
            RPMessage_notifyCallback, NULL
            );
    
        return status;
    }
    
    void  RPMessage_deInit(void)
    {
        uint16_t coreId;
    
        IpcNotify_unregisterClient(IPC_NOTIFY_CLIENT_ID_RPMSG);
    
        RPMessage_controlEndPtDeInit();
    
        for(coreId=0; coreId<CSL_CORE_ID_MAX; coreId++)
        {
            RPMessage_coreDeInit(coreId);
        }
    }
    
    

    Regards,

    Ashwin

  • Hello Ashwin,

    thanks for this proposal but unfortunately it does not seem to change the "deadlock" that both cores run into. So the case for the second situation is still the same.

    I do not understand the drivers completely but it seems that the RPMessage_vringPutEmptyRxBuf-function sets the value RPMESSAGE_MSG_VRING_NEW_EMPTY to be transmitted. But this value is never used anywhere else. But I do not think that's the main-problem since it seems to be an IPC-Mailbox-problem, since it's stuck in the loop when the mailbox is full but it's waiting for them to get "not full". I would've not expect for it to get full. Allthough the problem occurs even in the callback for the RPMessage where it was already handled and IPC should be free then. I guess it's on purpose to send this IpcNotify-Message with the value of RPMESSAGE_MSG_VRING_NEW_EMPTY even if it is never used inside the the callback for receiving of RPMessages?

    Best regards

    Felix

  • Hello Ashwin, another update:
    I removed the IpcNotify_sendMessage call inside RPMessage_vringPutEmptyRxBuf since I did not see any use of it at the remote cores (in your updated files line 272-276) . After removing everything seems to work fine. But I am not sure about this.

  • Hi Felix,

     IpcNotify_sendMessage call inside RPMessage_vringPutEmptyRxBuf is mandatory since that is how we notify the sender core that a VRING buffer has been freed and if any message is pending transmission, it can be sent out.

    Can you let me know how to create a sample application to recreate this scenario? I can try to debug further on my side.

    Regards,

    Ashwin

  • Hey Ashwin, that's what I also thought but where is this happening? I cannot see the answer is used anywhere inside the RPMessage. But maybe I am just not finding it.

    Since we are using a more complex system it is maybe not easy to recreate the same scenario. But we set up the following: RPMessage with 8 Buffers with each 512 Byte which uses the recvCallBack (not the notify) with an argument ( a this-pointer for a c++-object which holds the callback to be called in the end as a member-function) on two cores. After an IpcNotify-Sync you just start sending telegrams from each core. Maybe up to twenty (I didn't count after how many this occured). And then it should occur.

    Edit: I just noticed I didn't mention something that may be an issue here: inside a recvCallback we may also call an RPMessage send! That's like one core requests something and the confirmation is sent inside the callback. Not sure if this can also have an impact to this behaviour.

    Thanks for your effort!

    Anyway: Now that I removed this one IpcNotify_sendMsg, why does it work? we do not lose any packets and everything seems to work just fine. I would've expected that a message will pop up that a message can't be posted tot the VRING-buffer or anything, but nothing shows up.

    Best regards

    Felix

  • Hi Felix,

    RecvCallback() get executed within ISR context and RPMessage_send() should not be called from within an ISR. This is because RPMessage_send() uses ClockP API's which can cause a deadlock during timeout measurement. Can you try doing a semaphore post from the RecvCallback() API which unhalts another task which will do the RPMessageSend() ?

    Regards,

    Ashwin

  • hey Ashwin,

    ok! That sounds understandable. I would recommend adding this to the documentation. I will try to figure out how we can insert a receiver-task, since it means some structural change and probably some priority interferring with our startup-task. But I will test and keep you updated.

    Regards,

    Felix

  • Sure Felix, I'll take an action to update the documentation. Do keep me posted.

    Regards,

    Ashwin

  • Hes Ashwin,

    so I managed to create a sender-task. This one buffers our send-requests.

    This means in case the remote core sends a telegram the local core will not send that answer inside the interrupt-callback but will buffer it to the sender-task. The sender-task is just using a semaphore to get informed for new packages in the buffer and sends them out.

    There is a small problem since our init-task has the highest priority. this is also the task sending any requests (not answers to requests). The sender-task also got the highest priority. the init task won't use the sender-task and send out telegrams directly. Otherwise if inside an interrupt an answer is generated it gets put to a sender-task. Now which one proceeds? If the init-task still does some more requests the remote core will generate 8 answers for 8 interrupts and put them into the sender-buffer (which is 8 elements big).

    I know this is hard to think about without getting a brain shock since I cannot really reconstruct the situation myself.

    But continuing this situation it seems to happen that one core runs into

    and the other core asserts because the buffer is full inside the ipc-routine, which means the sender-task of the second core did probably not schedule (since the same prio as init-task) and thus the buffer didn't get emptied and also the first core which tried to send those telegrams is stuck in the "buffer is full" since it wouldn't get emptied. At least that's what I imagine from this situation. also what I don't get: if the buffer is full, why is an additional message sent which tells that the buffer is full?

    This only works if I create a really big buffer, so the interrupt-routine does not run into a full buffer. It's really unfortunate that I need to create an additional buffer which is bigger than the IPC-buffer itself.

    So I tried fiddling out a scenario where I can change the prio of the sender-task but this got a bit wild. Without any deepdive we have a bit more complex task-handling above FreeRTOS and here the init-task creates all other tasks that should always have a lower priority. creating a task with the same or higher prio than the init-task would cause a crash. I tried to activate the setPriority-possibility in FreeRTOS and later raise the prio of the TxTask but from here on my CCS striked and I couldn't continue. (read more at the end)

    Also if one time the ipc-mailbox is full I cannot just CPU reset via CCS to debug, I need to repower my board to get it working again. Else it's always stuck inside an ipc-full-routine.

    I tried calling a reset-routine for the reset-registers of all- Mailboxes like it is described in the TRM always if I start from core 0_0. but unfortunatley this does not help.
    I did it like this (a bit dirty, yeah.. but I see the regiters get reset in the register view)

        if( selfCore->getCoreId() == CSL_CORE_ID_R5FSS0_0 )
        {
            /* reset IPC MAILBOX-registers */
            for(uint32_t i=0; gIpcNotifyMailboxBaseAddr[i]!=0U; i++)
            {
                volatile uint32_t* mailboxPtr =
                reinterpret_cast<volatile uint32_t*>(gIpcNotifyMailboxBaseAddr[i]);
                mailboxPtr += 4; // get to SysCfg-register
                *mailboxPtr = 0x00000001;
                while ((*mailboxPtr & 0x00000001) != 0)
                {
    
                }
            }
            interruptConfigGb = &gIpcNotifyInterruptConfig_r5fss0_0[0];
            interruptConfigNumGb = gIpcNotifyInterruptConfigNum_r5fss0_0;
        }

    Also this is only for debugging. How can I make sure that the IPC MAILBOX registers get reset correctly? Currently it doesn't seem to work with the reset-registers.

    Also I had an annoying bug with CCS 12.5. I needed to restart my PC else CCS was stuck in a state not showing any symbols. whether in the core view or modules view. It just kept showing the 0x00000000-address even when in disassembly view showing the correct address of the PC and being able to step. unfortunately not having a full call stack makes debugging really hard. I am now in the state of continuously restarting CCS or my whole PC after some loads via CCS. That's why I couldn't continue. I will try my best next week, since I am now some days at vacation.

    Best regards

    Felix

  • Hi Felix,

    I have a few questions here:

    1- How are you making sure that Init task doesn't use sender task? Are you creating the sender task after the initialization is complete or Do you use a different Endpoint for initialization messages?

    2- "why is an additional message sent which tells that the buffer is full?"

    Which message are you referring here? RPMsg sends only two kinds of messages via Notify. First one is when the core has written some data to a VRING Buffer and wants to notify the same to remote core (This happens within RPMessage_vringPutFullTxBuf()). Second one is when a message received from a remote core has been processed and it wants to notify the remote core that a free space is available in the VRING for new messages(within RPMessage_vringPutEmptyRxBuf()).

    3- For the reset issue, the above Api looks fine. After executing the reset API, what are values in Mailbox Msg Status register? Does it say that all mailboxes are empty?

    Regards,

    Ashwin

  • Hey Ashwin,

    so I need to answer the questions the other way round:

    I start with 3:

    So after a reset at startup it looks like this:

    so I thought that should happen, but as soon as it once runs into Mailbox full It will be there even if I start core 1 again with the reset-mechanic. If I understand it right, this flag here tells us it's full:

    core 1 is then stuck here even after the reset-call:

    I need to repower the device to get it working again. But as we see the registers are reset before so I don't get what's going on here. Could it be the fifos are not really reset?

    thanks for the explanation for 2. But I still don't completely get the informing each core if full or empty since I cannot see there is any of those messages handled at the other core. Or maybe I just can't find it?

    and for 1. I just called the RPMessage send directly when doing a request.

    But: I found another more reliable solution at all. For now I will just wait for a response for every request so I do not fire out all requests one after each other.

    Then the question came up: You mentioned to not send an answer via RPMessage directly when we are in the callback of receiving for RPMessage. I understand the danger here, especially when we are struggling with full mailbox buffers. But even if we use all 4 cores and expect a buffer of 8 elements for RPMessage at each core:
    So when I respond inside the callback directly and since every core waits for an answer to its request, this should never fill any buffer so it should normally not get into a full state which could lead to deadlocks. At least that's what I can imagine.

    But of course if at runtime and not the init-phase I need to find a solution with buffering the answers before they are sent out.

    Also the issue with CCS was only solved by creating a new namespace. I already opened an e2e-thread.

    Best regards

    Felix

  • Hi Felix,

    For Issue#3, I have filed a bug in SDK for the same and will try to look into it around 2nd week of December. As you already mentioned, I too think the fifo's are somehow not getting reset here if the FIFO was full before we do the reset.

    Fo #2, We don't inform the other core in case of full or empty. Once we process a message from another core, we send back a message saying that a free space is available in the VRING. This is so that if a message was pending to be sent in the other core because the VRING was full, it can be sent now. This is done to get rid of a busy wait in a core when the VRING is full. If the VRING is full, we do a Semaphore Pend which gets posted when the VRING Available message.

    RPMessage_vringCheckEmptyTxBuf() is the API which gets called at the sender core when the receiver core sends a VRING available message.
    For #1, Does it work now with the response implemented? The concern for calling IPC send from within the callback is that send will wait for buffer to be available and increase interrupt latency and also, send uses ClockP for timeout measurement. If the priority of IPC interrupt is higher than that of clock interrupt, IPC send would hang within the timeout loop.

    Regards,

    Ashwin

  • Hey Ashwin,

    thanks for filing the bug! it seems to only appear from time to time now, but I cannot really tell what's the source of this. First it happened when we ran into the bufferFull-scenarios. It was mostly the case that a simple CPU-reset, a new load and thus the small reset-routine did not reset the fifos correclty. But it also happens now from time to time, expecially when I'm debugging and holding one core. but I'm not sure why. With the "waiting for response"-mechanism it works in general but still the problem can happen. Since the other core waits for an answer and does not send out new telegrams no buffer gets full, but still...
    This only (but not always) happens when I stop one core and when I am debug-setpping. It does not happen when the application just runs.

    I have a small problem with the sender-task which is related to how FreeRTOS works. So that's not TI-related but it's fiddling with priorities between the init- and sender-task, because we use C++-Objects for Tasks which call the CreateTask inside the Constructor but unfortunately if they have the same priority it can happen that the sender-task starts without having the constructor finished and thus later runs into an abort since there are some member-variables missing. But that's solely a FreeRTOS-problem which was also discussed in their forums. So I'm not sure how I could proceed here but that's on our side to solve.

    At least when just letting the application run I face no problems with sending an answer inside an interrupt. Thus I may still need to think about a solution for that.

    Thanks for the explanation for #2, that helps to understand thw whole situation a bit better!

    I would like to keep the thread open to stay up-to-date for #1. Is that ok?

    Best regards

    Felix

  • Hi Felix,

    That's fine, we can have the thread open until the issue is resolved. Do let me know if you need any help related to IPC.

    Regards,

    Ashwin

  • Hey Ashwin, another thought that came up: regarding the "respond inside an interrupt", wouldn't it also be possible to use the same packet of the VRING that was received as a packet to respond? At least that will save a memcpy and another additional space occupied in the VRING. This should prevent any buffer overflow since it's the same space occupied without allocating another buffer. But this also means the core receiving the response needs to know about the packet it should use since it's not in its receive-buffer.

    So not sure if there is a possibility to implement this without a big effort and handling around it.

    Best regards

    Felix

  • Hi Felix,

    This wouldn't be possible to implement because the RX and TX VRINGS between two cores are different.

    Regards,

    Ashwin