Tool/software:
I am using a AWR1843 based mistral radar. The following python code is being used to get and log data from the radar. How do I get and log the SNR data along with this
import serial
import time
import numpy as np
import datetime
# Change the configuration file name
configFileName = 'AWR1843config.cfg'
CLIport = {}
Dataport = {}
byteBuffer = np.zeros(2**15, dtype='uint8')
byteBufferLength = 0
detObj = {} # Initialize detObj globally for update function
# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport, Dataport
# Open the serial ports for the configuration and the data ports
# Raspberry pi
#CLIport = serial.Serial('/dev/ttyACM0', 115200)
#Dataport = serial.Serial('/dev/ttyACM1', 921600)
# Windows
CLIport = serial.Serial('/dev/ttyUSB0', 115200)
Dataport = serial.Serial('/dev/ttyUSB1', 921600)
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i+'\n').encode())
print(i)
time.sleep(0.01)
return CLIport, Dataport
# ------------------------------------------------------------------
# Function to parse the data inside the configuration file
def parseConfigFile(configFileName):
configParameters = {} # Initialize an empty dictionary to store the configuration parameters
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
# Split the line
splitWords = i.split(" ")
# Hard code the number of antennas, change if other configuration is used
numRxAnt = 4
numTxAnt = 3
# Get the information about the profile configuration
if "profileCfg" in splitWords[0]:
startFreq = int(float(splitWords[2]))
idleTime = int(splitWords[3])
rampEndTime = float(splitWords[5])
freqSlopeConst = float(splitWords[8])
numAdcSamples = int(splitWords[10])
numAdcSamplesRoundTo2 = 1
while numAdcSamples > numAdcSamplesRoundTo2:
numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2
digOutSampleRate = int(splitWords[11])
# Get the information about the frame configuration
elif "frameCfg" in splitWords[0]:
chirpStartIdx = int(splitWords[1])
chirpEndIdx = int(splitWords[2])
numLoops = int(splitWords[3])
numFrames = int(splitWords[4])
framePeriodicity = float(splitWords[5])
# Combine the read data to obtain the configuration parameters
numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops
configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt
configParameters["numRangeBins"] = numAdcSamplesRoundTo2
configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples)
configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"])
configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt)
configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3)
configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt)
return configParameters
# ------------------------------------------------------------------
# Function to read and parse the incoming data
def readAndParseData18xx(Dataport, configParameters):
global byteBuffer, byteBufferLength
# Constants
OBJ_STRUCT_SIZE_BYTES = 12
BYTE_VEC_ACC_MAX_SIZE = 2**15
MMWDEMO_UART_MSG_DETECTED_POINTS = 1
MMWDEMO_UART_MSG_RANGE_PROFILE = 2
maxBufferSize = 2**15
tlvHeaderLengthInBytes = 8
pointLengthInBytes = 16
magicWord = [2, 1, 4, 3, 6, 5, 8, 7]
# Initialize variables
magicOK = 0 # Checks if magic number has been read
dataOK = 0 # Checks if the data has been read correctly
frameNumber = 0
readBuffer = Dataport.read(Dataport.in_waiting)
byteVec = np.frombuffer(readBuffer, dtype='uint8')
byteCount = len(byteVec)
# Initialize detObj with default values
detObj = {"numObj": 0, "x": np.array([]), "y": np.array([]), "z": np.array([]), "velocity": np.array([])}
# Check that the buffer is not full, and then add the data to the buffer
if (byteBufferLength + byteCount) < maxBufferSize:
byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount]
byteBufferLength = byteBufferLength + byteCount
# Check that the buffer has some data
if byteBufferLength > 16:
# Check for all possible locations of the magic word
possibleLocs = np.where(byteBuffer == magicWord[0])[0]
# Confirm that is the beginning of the magic word and store the index in startIdx
startIdx = []
for loc in possibleLocs:
check = byteBuffer[loc:loc+8]
if np.all(check == magicWord):
startIdx.append(loc)
# Check that startIdx is not empty
if startIdx:
# Remove the data before the first start index
if startIdx[0] > 0 and startIdx[0] < byteBufferLength:
byteBuffer[:byteBufferLength-startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength]
byteBuffer[byteBufferLength-startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength-startIdx[0]:]), dtype='uint8')
byteBufferLength = byteBufferLength - startIdx[0]
# Check that there have no errors with the byte buffer length
if byteBufferLength < 0:
byteBufferLength = 0
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Read the total packet length
totalPacketLen = np.matmul(byteBuffer[12:12+4], word)
# Check that all the packet has been read
if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0):
magicOK = 1
# If magicOK is equal to 1 then process the message
if magicOK:
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Initialize the pointer index
idX = 0
# Read the header
magicNumber = byteBuffer[idX:idX+8]
idX += 8
version = format(np.matmul(byteBuffer[idX:idX+4], word), 'x')
idX += 4
totalPacketLen = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
platform = format(np.matmul(byteBuffer[idX:idX+4], word), 'x')
idX += 4
frameNumber = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
timeCpuCycles = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
numDetectedObj = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
numTLVs = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
subFrameNumber = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
# Read the TLV messages
for tlvIdx in range(numTLVs):
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Check the header of the TLV message
tlv_type = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
tlv_length = np.matmul(byteBuffer[idX:idX+4], word)
idX += 4
# Read the data depending on the TLV message
if tlv_type == MMWDEMO_UART_MSG_DETECTED_POINTS:
# Initialize the arrays
x = np.zeros(numDetectedObj, dtype=np.float32)
y = np.zeros(numDetectedObj, dtype=np.float32)
z = np.zeros(numDetectedObj, dtype=np.float32)
velocity = np.zeros(numDetectedObj, dtype=np.float32)
for objectNum in range(numDetectedObj):
# Read the data for each object
x[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
y[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
z[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
velocity[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32)
idX += 4
# Store the data in the detObj dictionary
detObj = {"numObj": numDetectedObj, "x": x, "y": y, "z": z, "velocity": velocity}
dataOK = 1
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")
# Write received data to a log file
with open('Mistral_log.txt', 'a') as log_file:
# log_file.write("Timestamp: {}\n".format(timestamp))
log_file.write("Frame Number: {}\n".format(frameNumber))
log_file.write("Number of Detected Objects: {}\n".format(numDetectedObj))
for obj_idx in range(numDetectedObj):
log_file.write("Timestamp: {}\n".format(timestamp))
log_file.write("Object {}:\n".format(obj_idx))
log_file.write(" x: {}\n".format(detObj["x"][obj_idx]))
log_file.write(" y: {}\n".format(detObj["y"][obj_idx]))
log_file.write(" z: {}\n".format(detObj["z"][obj_idx]))
log_file.write(" velocity: {}\n".format(detObj["velocity"][obj_idx]))
log_file.write("\n")
time.sleep(0.1)
print(obj_idx)
print(numDetectedObj)
# Remove already processed data
if idX > 0 and byteBufferLength > idX:
shiftSize = totalPacketLen
byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength]
byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]), dtype='uint8')
byteBufferLength = byteBufferLength - shiftSize
# Check that there are no errors with the buffer length
if byteBufferLength < 0:
byteBufferLength = 0
return dataOK, frameNumber, detObj
# ------------------------- MAIN -----------------------------------------
# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)
# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)
try:
while True:
# Read and parse the received data
readAndParseData18xx(Dataport, configParameters)
except KeyboardInterrupt:
print("Keyboard interrupt detected. Exiting program.")
# Close the serial ports
CLIport.close()
Dataport.close()
Also attaching below my .cfg config file for reference.
% ***************************************************************
% Created for SDK ver:03.05
% Created using Visualizer ver:3.6.0.0
% Frequency:77
% Platform:xWR18xx
% Scene Classifier:best_range_res
% Azimuth Resolution(deg):15
% Range Resolution(m):0.044
% Maximum unambiguous Range(m):9.02
% Maximum Radial Velocity(m/s):1
% Radial velocity resolution(m/s):0.13
% Frame Duration(msec):100
% RF calibration data:None
% Range Detection Threshold (dB):15
% Doppler Detection Threshold (dB):15
% Range Peak Grouping:enabled
% Doppler Peak Grouping:enabled
% Static clutter removal:disabled
% Angle of Arrival FoV: Full FoV
% Range FoV: Full FoV
% Doppler FoV: Full FoV
% ***************************************************************
sensorStop
flushCfg
dfeDataOutputMode 1
channelCfg 15 5 0
adcCfg 2 1
adcbufCfg -1 0 1 1 1
profileCfg 0 77 429 7 57.14 0 0 70 1 256 5209 0 0 30
chirpCfg 0 0 0 0 0 0 0 1
chirpCfg 1 1 0 0 0 0 0 4
frameCfg 0 1 16 0 100 1 0
lowPower 0 0
guiMonitor -1 1 1 0 0 0 1
cfarCfg -1 0 2 8 4 3 0 15 1
cfarCfg -1 1 0 4 2 3 1 15 1
multiObjBeamForming -1 1 0.5
clutterRemoval -1 0
calibDcRangeSig -1 0 -5 8 256
extendedMaxVelocity -1 0
lvdsStreamCfg -1 0 0 0
compRangeBiasAndRxChanPhase 0.0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0
measureRangeBiasAndRxChanPhase 0 1.5 0.2
CQRxSatMonitor 0 3 5 121 0
CQSigImgMonitor 0 127 4
analogMonitor 0 0
aoaFovCfg -1 -90 90 -90 90
cfarFovCfg -1 0 0 8.92
cfarFovCfg -1 1 -1 1.00
calibData 0 0 0
sensorStart
Regards
Aziz