Hi Team,
I'm currently working with the IWR1443BOOST EVM, is there a python example to read data from the serial ports, I'm unable to get it to work with the following program.
Any python example to read data from this board would be a great starter for me.
import serial import time import numpy as np import pyqtgraph as pg from pyqtgraph.Qt import QtGui # Change the configuration file name configFileName = '1443config.cfg' CLIport = {} Dataport = {} byteBuffer = np.zeros(2**15,dtype = 'uint8') byteBufferLength = 0; # ------------------------------------------------------------------ # Function to configure the serial ports and send the data from # the configuration file to the radar def serialConfig(configFileName): global CLIport global Dataport # Open the serial ports for the configuration and the data ports # Raspberry pi #CLIport = serial.Serial('/dev/ttyACM0', 115200) #Dataport = serial.Serial('/dev/ttyACM1', 921600) # Windows CLIport = serial.Serial('COM4', 115200) Dataport = serial.Serial('COM3', 921600) print(CLIport) print(Dataport) # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: CLIport.write((i+'\n').encode()) print(i) time.sleep(0.01) return CLIport, Dataport # ------------------------------------------------------------------ # Function to parse the data inside the configuration file def parseConfigFile(configFileName): configParameters = {} # Initialize an empty dictionary to store the configuration parameters # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: # Split the line splitWords = i.split(" ") # Hard code the number of antennas, change if other configuration is used numRxAnt = 4 numTxAnt = 3 # Get the information about the profile configuration if "profileCfg" in splitWords[0]: startFreq = int(float(splitWords[2])) idleTime = int(splitWords[3]) rampEndTime = float(splitWords[5]) freqSlopeConst = float(splitWords[8]) numAdcSamples = int(splitWords[10]) numAdcSamplesRoundTo2 = 1; while numAdcSamples > numAdcSamplesRoundTo2: numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2; digOutSampleRate = int(splitWords[11]); # Get the information about the frame configuration elif "frameCfg" in splitWords[0]: chirpStartIdx = int(splitWords[1]); chirpEndIdx = int(splitWords[2]); numLoops = int(splitWords[3]); numFrames = int(splitWords[4]); framePeriodicity = int(splitWords[5]); # Combine the read data to obtain the configuration parameters numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt configParameters["numRangeBins"] = numAdcSamplesRoundTo2 configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples) configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"]) configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt) configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3) configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt) return configParameters # ------------------------------------------------------------------ # Funtion to read and parse the incoming data def readAndParseData14xx(Dataport, configParameters): global byteBuffer, byteBufferLength # Constants OBJ_STRUCT_SIZE_BYTES = 12; BYTE_VEC_ACC_MAX_SIZE = 2**15; MMWDEMO_UART_MSG_DETECTED_POINTS = 1; MMWDEMO_UART_MSG_RANGE_PROFILE = 2; maxBufferSize = 2**15; magicWord = [2, 1, 4, 3, 6, 5, 8, 7] # Initialize variables magicOK = 0 # Checks if magic number has been read dataOK = 0 # Checks if the data has been read correctly frameNumber = 0 detObj = {} readBuffer = Dataport.read(Dataport.in_waiting) print(readBuffer) byteVec = np.frombuffer(readBuffer, dtype = 'uint8') byteCount = len(byteVec) # Check that the buffer is not full, and then add the data to the buffer if (byteBufferLength + byteCount) < maxBufferSize: byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount] byteBufferLength = byteBufferLength + byteCount # Check that the buffer has some data if byteBufferLength > 16: # Check for all possible locations of the magic word possibleLocs = np.where(byteBuffer == magicWord[0])[0] # Confirm that is the beginning of the magic word and store the index in startIdx startIdx = [] for loc in possibleLocs: check = byteBuffer[loc:loc+8] if np.all(check == magicWord): startIdx.append(loc) # Check that startIdx is not empty if startIdx: # Remove the data before the first start index if startIdx[0] > 0 and startIdx[0] < byteBufferLength: byteBuffer[:byteBufferLength-startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength] byteBuffer[byteBufferLength-startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength-startIdx[0]:]),dtype = 'uint8') byteBufferLength = byteBufferLength - startIdx[0] # Check that there have no errors with the byte buffer length if byteBufferLength < 0: byteBufferLength = 0 # word array to convert 4 bytes to a 32 bit number word = [1, 2**8, 2**16, 2**24] # Read the total packet length totalPacketLen = np.matmul(byteBuffer[12:12+4],word) # Check that all the packet has been read if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0): magicOK = 1 # If magicOK is equal to 1 then process the message if magicOK: # word array to convert 4 bytes to a 32 bit number word = [1, 2**8, 2**16, 2**24] # Initialize the pointer index idX = 0 # Read the header magicNumber = byteBuffer[idX:idX+8] idX += 8 version = format(np.matmul(byteBuffer[idX:idX+4],word),'x') idX += 4 totalPacketLen = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 platform = format(np.matmul(byteBuffer[idX:idX+4],word),'x') idX += 4 frameNumber = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 timeCpuCycles = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 numDetectedObj = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 numTLVs = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 # UNCOMMENT IN CASE OF SDK 2 #subFrameNumber = np.matmul(byteBuffer[idX:idX+4],word) #idX += 4 # Read the TLV messages for tlvIdx in range(numTLVs): # word array to convert 4 bytes to a 32 bit number word = [1, 2**8, 2**16, 2**24] # Check the header of the TLV message tlv_type = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 tlv_length = np.matmul(byteBuffer[idX:idX+4],word) idX += 4 # Read the data depending on the TLV message if tlv_type == MMWDEMO_UART_MSG_DETECTED_POINTS: # word array to convert 4 bytes to a 16 bit number word = [1, 2**8] tlv_numObj = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 tlv_xyzQFormat = 2**np.matmul(byteBuffer[idX:idX+2],word) idX += 2 # Initialize the arrays rangeIdx = np.zeros(tlv_numObj,dtype = 'int16') dopplerIdx = np.zeros(tlv_numObj,dtype = 'int16') peakVal = np.zeros(tlv_numObj,dtype = 'int16') x = np.zeros(tlv_numObj,dtype = 'int16') y = np.zeros(tlv_numObj,dtype = 'int16') z = np.zeros(tlv_numObj,dtype = 'int16') for objectNum in range(tlv_numObj): # Read the data for each object rangeIdx[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 dopplerIdx[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 peakVal[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 x[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 y[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 z[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) idX += 2 # Make the necessary corrections and calculate the rest of the data rangeVal = rangeIdx * configParameters["rangeIdxToMeters"] dopplerIdx[dopplerIdx > (configParameters["numDopplerBins"]/2 - 1)] = dopplerIdx[dopplerIdx > (configParameters["numDopplerBins"]/2 - 1)] - 65535 dopplerVal = dopplerIdx * configParameters["dopplerResolutionMps"] #x[x > 32767] = x[x > 32767] - 65536 #y[y > 32767] = y[y > 32767] - 65536 #z[z > 32767] = z[z > 32767] - 65536 x = x / tlv_xyzQFormat y = y / tlv_xyzQFormat z = z / tlv_xyzQFormat # Store the data in the detObj dictionary detObj = {"numObj": tlv_numObj, "rangeIdx": rangeIdx, "range": rangeVal, "dopplerIdx": dopplerIdx, \ "doppler": dopplerVal, "peakVal": peakVal, "x": x, "y": y, "z": z} dataOK = 1 # Remove already processed data if idX > 0 and byteBufferLength > idX: shiftSize = totalPacketLen byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength] byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]),dtype = 'uint8') byteBufferLength = byteBufferLength - shiftSize # Check that there are no errors with the buffer length if byteBufferLength < 0: byteBufferLength = 0 return dataOK, frameNumber, detObj # ------------------------------------------------------------------ # Funtion to update the data and display in the plot def update(): dataOk = 0 global detObj x = [] y = [] # Read and parse the received data dataOk, frameNumber, detObj = readAndParseData14xx(Dataport, configParameters) if dataOk and len(detObj["x"]) > 0: #print(detObj) x = -detObj["x"] y = detObj["y"] s.setData(x,y) #QtGui.QApplication.processEvents() print(str(dataOk)+" frame number" + str(frameNumber)) return dataOk # ------------------------- MAIN ----------------------------------------- # Configurate the serial port CLIport, Dataport = serialConfig(configFileName) # Get the configuration parameters from the configuration file configParameters = parseConfigFile(configFileName) # START QtAPPfor the plot #app = QtGui.QApplication([]) # Set the plot #pg.setConfigOption('background','w') #win = pg.GraphicsWindow(title="2D scatter plot") #p = win.addPlot() #p.setXRange(-0.5,0.5) #p.setYRange(0,1.5) #p.setLabel('left',text = 'Y position (m)') #p.setLabel('bottom', text= 'X position (m)') #s = p.plot([],[],pen=None,symbol='o') # Main loop detObj = {} frameData = {} currentIndex = 0 while True: try: # Update the data and check if the data is okay dataOk = update() if dataOk: # Store the current frame into frameData frameData[currentIndex] = detObj currentIndex += 1 print("im here") time.sleep(0.033) # Sampling frequency of 30 Hz # Stop the program and close everything if Ctrl + c is pressed except KeyboardInterrupt: CLIport.write(('sensorStop\n').encode()) CLIport.close() Dataport.close() win.close() break