This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

AWR1642BOOST: iwr1642

Part Number: AWR1642BOOST

Hi adnan is here i run and find range with MATLAB already for people counting demo but now i want to shift my code with python please any one help me 

i found one code but that code is not providing exact data 

here is that code

import serial
#import imaplib
import time
import math
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui


# Change the configuration file name
configFileName = 'mmw_pplcount_demo_default.cfg'
CLIport = {}
Dataport = {}
byteBuffer = np.zeros(2**15,dtype = 'uint8')
byteBufferLength = 0;


# ------------------------------------------------------------------

# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):

global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports

# Raspberry pi
## CLIport = serial.Serial('/dev/ttyACM0', 115200)
## Dataport = serial.Serial('/dev/ttyACM1', 921600)

# Windows
CLIport = serial.Serial(port='COM3',baudrate=115200,timeout=2)
Dataport = serial.Serial(port='COM4',baudrate=921600, timeout=2)

Dataport.close()
Dataport.open()

# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i+'\n').encode())
print(i)
time.sleep(0.01)

return CLIport, Dataport

# ------------------------------------------------------------------

# Function to parse the data inside the configuration file
def parseConfigFile(configFileName):
configParameters = {} # Initialize an empty dictionary to store the configuration parameters

# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:

# Split the line
splitWords = i.split(" ")
print('splitWords',splitWords)

# Hard code the number of antennas, change if other configuration is used
numRxAnt = 4
numTxAnt = 2

# Get the information about the profile configuration
if "profileCfg" in splitWords[0]:
startFreq = int(float(splitWords[2]))
idleTime = int(splitWords[3])
rampEndTime = float(splitWords[5])
freqSlopeConst = float(splitWords[8])
numAdcSamples = int(splitWords[10])
digOutSampleRate = int(splitWords[11])
numAdcSamplesRoundTo2 = 1;

while numAdcSamples > numAdcSamplesRoundTo2:
numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2;

digOutSampleRate = int(splitWords[11]);

# Get the information about the frame configuration
elif "frameCfg" in splitWords[0]:

chirpStartIdx = int(splitWords[1]);
chirpEndIdx = int(splitWords[2]);
numLoops = int(splitWords[3]);
numFrames = int(splitWords[4]);
framePeriodicity = int(splitWords[5]);


# Combine the read data to obtain the configuration parameters
numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops
configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt
configParameters["numRangeBins"] = numAdcSamplesRoundTo2
configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples)
configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"])
configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt)
configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3)
configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt)

return configParameters
# ------------------------------------------------------------------

# Funtion to read and parse the incoming data
def readAndParseData16xx(Dataport, configParameters):
global byteBuffer, byteBufferLength

# Constants
OBJ_STRUCT_SIZE_BYTES = 12;
BYTE_VEC_ACC_MAX_SIZE = 2**15;
MMWDEMO_UART_MSG_POINT_CLOUD_2D = 6;
MMWDEMO_UART_MSG_TARGET_LIST_2D = 7;
MMWDEMO_UART_MSG_TARGET_INDEX_2D = 8;
maxBufferSize = 2**15;
tlvHeaderLengthInBytes = 8;
pointLengthInBytes = 16;
targetLengthInBytes = 68;
magicWord = [2, 1, 4, 3, 6, 5, 8, 7]

# Initialize variables

magicOK = 0 # Checks if magic number has been read
dataOK = 0 # Checks if the data has been read correctly
frameNumber = 0
targetObj = {}
pointObj={}

readBuffer = Dataport.read(Dataport.in_waiting)
byteVec = np.frombuffer(readBuffer, dtype = 'uint8')
print('byteVec',byteVec)
byteCount = len(byteVec)

# Check that the buffer is not full, and then add the data to the buffer

if (byteBufferLength + byteCount) < maxBufferSize:
print('byteBufferLength',byteCount)
print('byteCount',byteCount)
byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount]
print('byteBufferafter',byteBuffer[byteBufferLength:byteBufferLength + byteCount])

byteBufferLength = byteBufferLength + byteCount

# Check that the buffer has some data

if byteBufferLength > 16:

# Check for all possible locations of the magic word
possibleLocs = np.where(byteBuffer == magicWord[0])[0]

# Confirm that is the beginning of the magic word and store the index in startIdx
startIdx = []
for loc in possibleLocs:
check = byteBuffer[loc:loc + 8]
if np.all(check == magicWord):
startIdx.append(loc)


# Check that startIdx is not empty
if startIdx:

# Remove the data before the first start index
if startIdx[0] > 0:
byteBuffer[:byteBufferLength - startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength]
byteBufferLength = byteBufferLength - startIdx[0]

# Check that there have no errors with the byte buffer length
if byteBufferLength < 0:
byteBufferLength = 0

# word array to convert 4 bytes to a 32 bit number
word = [1, 2 ** 8, 2 ** 16, 2 ** 24]


# Read the total packet length
totalPacketLen = np.matmul(byteBuffer[20:20 + 4], word)

# Check that all the packet has been read
if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0):
magicOK = 1

# If magicOK is equal to 1 then process the message
if magicOK:
# word array to convert 4 bytes to a 32 bit number
word = [1, 2 ** 8, 2 ** 16, 2 ** 24]

# Initialize the pointer index
idX = 0

# Read the header
# Read the header
magicNumber = byteBuffer[idX:idX + 8]
print('magicNumber',magicNumber)

idX += 8

version = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x')
idX += 4
platform = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x')
idX += 4
timeStamp = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
totalPacketLen = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
frameNumber = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
subFrameNumber = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
chirpMargin = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
frameMargin = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
uartSentTime = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
trackProcessTime = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4

word = [1, 2 ** 8]
#print('idX',byteBuffer[idX:idX + 2])
numTLVs = np.matmul(byteBuffer[48:48 + 2], word)

print('numTLVs',numTLVs)
idX += 2
checksum = np.matmul(byteBuffer[idX:idX + 2], word)
idX += 2

# Read the TLV messages
for tlvIdx in range(numTLVs):
# word array to convert 4 bytes to a 32 bit number
word = [1, 2 ** 8, 2 ** 16, 2 ** 24]

# Check the header of the TLV message
print('word',word)
print('byte buf',byteBuffer[idX:idX + 4])

tlv_type = np.matmul(byteBuffer[idX:idX + 4], word)
print('tlvtype',tlv_type)
idX += 4
tlv_length = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4

# Read the data depending on the TLV message
if tlv_type == MMWDEMO_UART_MSG_POINT_CLOUD_2D:
# word array to convert 4 bytes to a 16 bit number
word = [1, 2 ** 8, 2 ** 16, 2 ** 24]

# Calculate the number of detected points
numInputPoints = (tlv_length - tlvHeaderLengthInBytes) // pointLengthInBytes

# Initialize the arrays
rangeVal = np.zeros(numInputPoints, dtype=np.float32)
azimuth = np.zeros(numInputPoints, dtype=np.float32)
dopplerVal = np.zeros(numInputPoints, dtype=np.float32)
snr = np.zeros(numInputPoints, dtype=np.float32)
for objectNum in range(numInputPoints):
# Read the data for each object
rangeVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
azimuth[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
dopplerVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
snr[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4

# Store the data in the detObj dictionary
pointObj = {"numObj": numInputPoints, "range": rangeVal, "azimuth": azimuth,\
"doppler": dopplerVal, "snr": snr}

elif tlv_type == MMWDEMO_UART_MSG_TARGET_LIST_2D:

# word array to convert 4 bytes to a 16 bit number
word = [1, 2 ** 8, 2 ** 16, 2 ** 24]

# Calculate the number of target points
numTargetPoints = (tlv_length - tlvHeaderLengthInBytes) // targetLengthInBytes


# Initialize the arrays
targetId = np.zeros(numTargetPoints, dtype=np.uint32)
posX = np.zeros(numTargetPoints, dtype=np.float32)
posY = np.zeros(numTargetPoints, dtype=np.float32)
velX = np.zeros(numTargetPoints, dtype=np.float32)
velY = np.zeros(numTargetPoints, dtype=np.float32)
accX = np.zeros(numTargetPoints, dtype=np.float32)
accY = np.zeros(numTargetPoints, dtype=np.float32)
EC = np.zeros((3, 3, numTargetPoints), dtype=np.float32) # Error covariance matrix
G = np.zeros(numTargetPoints, dtype=np.float32) # Gain

for objectNum in range(numTargetPoints):
# Read the data for each object
targetId[objectNum] = np.matmul(byteBuffer[idX:idX + 4], word)
idX += 4
posX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
posY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
velX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
velY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
accX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
accY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[0, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[0, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[0, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[1, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[1, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[1, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[2, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[2, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
EC[2, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
G[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4


# Store the data in the detObj dictionary
targetObj = {"targetId": targetId, "posX": posX, "posY": posY, \
"velX": velX, "velY": velY, "accX": accX, "accY": accY, \
"EC": EC, "G": G}

elif tlv_type == MMWDEMO_UART_MSG_TARGET_INDEX_2D:
# Calculate the length of the index message
numIndices = tlv_length - tlvHeaderLengthInBytes
indices = byteBuffer[idX:idX + numIndices]
print('indices',indices)
idX += numIndices
dataOK = 1


# Remove already processed data
if idX > 0 and dataOK == 1:
shiftSize = idX
byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength]
byteBufferLength = byteBufferLength - shiftSize

# Check that there are no errors with the buffer length
if byteBufferLength < 0:
byteBufferLength = 0
# print('buffer length',byteBufferLength)
#print('targetObj',targetObj)
#print('dataOK',dataOK)

return dataOK, frameNumber, pointObj,targetObj

# ------------------------------------------------------------------

# Funtion to update the data and display in the plot
def update():

dataOk = 0
global targetObj
global pointObj
x = []
y = []

# Read and parse the received data
dataOk, frameNumber, pointObj,targetObj = readAndParseData16xx(Dataport, configParameters)

if dataOk:


#if targetObj:
x = -targetObj["posX"]
y = targetObj["posY"]
#if pointObj:
#x=-pointObj["range"]*np.sin(pointObj["azimuth"])
#y=pointObj["range"]*np.cos(pointObj["azimuth"])

s.setData(x,y)
QtGui.QApplication.processEvents()

return dataOk


# ------------------------- MAIN -----------------------------------------

# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)

# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)

# START QtAPPfor the plot
app = QtGui.QApplication([])

# Set the plot
pg.setConfigOption('background','w')
win = pg.GraphicsWindow(title="2D scatter plot")
p = win.addPlot()
p.setXRange(-0.5,0.5)
p.setYRange(0,1.5)
p.setLabel('left',text = 'Y position (m)')
p.setLabel('bottom', text= 'X position (m)')
s = p.plot([],[],pen=None,symbol='o')


# Main loop
targetObj = {}
pointObj={}
frameData = {}
currentIndex = 0
while True:
try:
# Update the data and check if the data is okay
dataOk = update()

if dataOk:
# Store the current frame into frameData
frameData[currentIndex] = targetObj
currentIndex += 1

time.sleep(0.033) # Sampling frequency of 30 Hz

# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
#CLIport.write(('sensorStop\n').encode())
CLIport.close()
Dataport.close()
win.close()
break

  • Hi Adnan,

    It would be good if you go over the people counting demo GUI source code and understand the flow then migrate to another coding language.

    Regards,

    Jitendra

  • Dear i alredy understnd that code but now i want obstacle detection for that purpose i need python code and there is alot of problem in this board

    On Thursday, September 26, 2019, Jitendra Gupta <bounce-1717369@mail.e2e.ti.com> wrote:

     

    A Message from the TI E2E™ support forums
    Texas Instruments

     

    Jitendra Gupta replied to AWR1642BOOST: iwr1642.

    Hi Adnan,

    It would be good if you go over the people counting demo GUI source code and understand the flow then migrate to another coding language.

    Regards,

    Jitendra

     

     

    You received this notification because you subscribed to the forum.  To unsubscribe from only this thread, go here.

    Flag this post as spam/abuse.

    refid:6f50e061-401f-42b3-939b-bde682028d64

  • Hi Adnan,

    My recommendation is to write a parser function that can parse one frame of data.  The data output is described in the people counting user's guide. You can then call this function every frame period (50 ms in the people counting demo). If running PyQT, I recommend parsing the data in a QT thread, as there will be a lot of downtime in the parsing function while you are waiting for all the data to come across UART. Once you have the data, you can use a graphing function of your choice:

    1. Matplotlib
    2. Vispy
    3. Pyqtgraph

    Because you do not have a specific question, I am closing this thread.

    Regards,

    Justin