TI mmwaveISK 6843 with raspberry PI source codeShare/TI Radar 2021. 10. 11. 14:41
It's for the connection between TI mmwave ISK 6843 interfaces and Raspberrypi4 (raspbian)
This code is based on github, https://github.com/ibaiGorordo/AWR1642-Read-Data-Python-MMWAVE-SDK-2
GitHub - ibaiGorordo/AWR1642-Read-Data-Python-MMWAVE-SDK-2: Python program to read and plot the data in real time from the AWR16
Python program to read and plot the data in real time from the AWR1642 and IWR1642 mmWave radar boards (Texas Instruments). - GitHub - ibaiGorordo/AWR1642-Read-Data-Python-MMWAVE-SDK-2: Python pro...
However, the obove is for only ISK 1682 interface message frame which is a little bit differ from the ISK6843.
So I changed the source code for 6843
Just let me share the source code first, and I would share the specific contents soon for understanding.
#try to connect to DB #PointClound Completed 21.05.27 #Simple Logic then DB import mysql.connector; import struct from datetime import datetime import serial import time import numpy as np import pyqtgraph as pg from pyqtgraph.Qt import QtGui import math import threading Maria = mysql.connector.connect(host="", user="root", passwd="raspberry", database="sensor1"); Cursor = Maria.cursor(prepared=True) # Change the configuration file name # configFileName = 'mmw_pplcount_demo_default.cfg' configFileName = 'ppCntCfg.cfg' # configFileName = '1443config.cfg' # configFileName = 'selfConfig1.cfg' CLIport = {} Dataport = {} byteBuffer = np.zeros(2**15,dtype = 'uint8') byteBufferLength = 0; # ------------------------------------------------------------------ p_cnt = 0 ; def printTest(): # now_time = '({})'.format(datetime.today().strftime('%Y-%m-%d %H:%M:%S')) # print(now_time) sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( 'Check Time Saved ', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'chk') ) Maria.commit(); # threading.Timer(10,printTest).start() # Function to configure the serial ports and send the data from # the configuration file to the radar def serialConfig(configFileName): global CLIport global Dataport # Open the serial ports for the configuration and the data ports # Raspberry pi CLIport = serial.Serial('/dev/ttyUSB0', 115200) Dataport = serial.Serial('/dev/ttyUSB1', 921600) # Windows #CLIport = serial.Serial('COM3', 115200) #Dataport = serial.Serial('COM4', 921600) # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: CLIport.write((i+'\n').encode()) print(i) time.sleep(0.01) return CLIport, Dataport # ------------------------------------------------------------------ # Function to parse the data inside the configuration file def parseConfigFile(configFileName): configParameters = {} # Initialize an empty dictionary to store the configuration parameters # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: # Split the line splitWords = i.split(" ") # Hard code the number of antennas, change if other configuration is used numRxAnt = 4 numTxAnt = 3 #jimmy revised # Get the information about the profile configuration if "profileCfg" in splitWords[0]: startFreq = int(float(splitWords[2])) idleTime = int(float(splitWords[3])) #j rampEndTime = float(splitWords[5]) freqSlopeConst = float(splitWords[8]) numAdcSamples = int(splitWords[10]) digOutSampleRate = int(float(splitWords[11])) numAdcSamplesRoundTo2 = 1; while numAdcSamples > numAdcSamplesRoundTo2: numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2; digOutSampleRate = int(float(splitWords[11])); # Get the information about the frame configuration elif "frameCfg" in splitWords[0]: chirpStartIdx = int(float(splitWords[1]));#j chirpEndIdx = int(splitWords[2]); numLoops = int(float(splitWords[3])); #j numFrames = int(splitWords[4]); framePeriodicity = int(float(splitWords[5])); # Combine the read data to obtain the configuration parameters numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt configParameters["numRangeBins"] = numAdcSamplesRoundTo2 configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples) configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"]) configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt) configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3) configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt) return configParameters # ------------------------------------------------------------------ # Funtion to read and parse the incoming data def readAndParseData16xx(Dataport, configParameters): global byteBuffer, byteBufferLength # Constants OBJ_STRUCT_SIZE_BYTES = 12; BYTE_VEC_ACC_MAX_SIZE = 2**15; MMWDEMO_UART_MSG_POINT_CLOUD_3D = 6; # renamed 2D -> 3D MMWDEMO_UART_MSG_TARGET_LIST_2D = 7; MMWDEMO_UART_MSG_TARGET_INDEX_2D = 8; maxBufferSize = 2**15; tlvHeaderLengthInBytes = 8; pointLengthInBytes = 8; # modified targetLengthInBytes = 68; magicWord = [2, 1, 4, 3, 6, 5, 8, 7] # Initialize variables magicOK = 0 # Checks if magic number has been read dataOK = 0 # Checks if the data has been read correctly targetDetected = 0 # Checks if a person has been detected frameNumber = 0 targetObj = {} pointObj = {} readBuffer = Dataport.read(Dataport.in_waiting) # print('just Read : {0}'.format(readBuffer)) byteVec = np.frombuffer(readBuffer, dtype = 'uint8') byteCount = len(byteVec) print('inital byteVec:{0}'.format(byteVec)) # Check that the buffer is not full, and then add the data to the buffer if (byteBufferLength + byteCount) < maxBufferSize: # print('check buffer, add data -> buffer ') byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount] #understand? # print(byteBufferLength) # print(byteBufferLength + byteCount) # print(byteBuffer[40])#104 # print('maybe vec and buff be samed{0}'.format(byteBuffer[0:byteCount])) # print(len(byteBuffer)) # print(len(byteVec)) byteBufferLength = byteBufferLength + byteCount # Check that the buffer has some data if byteBufferLength > 16: # print('already some data ') # Check for all possible locations of the magic word possibleLocs = np.where(byteBuffer == magicWord[0])[0] # Confirm that is the beginning of the magic word and store the index in startIdx startIdx = [] for loc in possibleLocs: check = byteBuffer[loc:loc + 8] if np.all(check == magicWord): startIdx.append(loc) # Check that startIdx is not empty if startIdx: # Remove the data before the first start index if startIdx[0] > 0 and startIdx[0] < byteBufferLength: byteBuffer[:byteBufferLength - startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength] byteBuffer[byteBufferLength-startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength-startIdx[0]:]),dtype = 'uint8') byteBufferLength = byteBufferLength - startIdx[0] # Check that there have no errors with the byte buffer length if byteBufferLength < 0: byteBufferLength = 0 # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Read the total packet length # totalPacketLen = np.matmul(byteBuffer[20:20 + 4], word) totalPacketLen = np.matmul(byteBuffer[12:12 + 4], word) # Check that all the packet has been read if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0): # print("now magicOk") magicOK = 1 # If magicOK is equal to 1 then process the message if magicOK: # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # print("magic okay") # Initialize the pointer index idX = 0 # Read the header # Read the header #why do this ?? print('current bytebuffer now will cook: {0}'.format(byteBuffer[startIdx[0]:byteBufferLength])) magicNumber = byteBuffer[idX:idX + 8] idX += 8 # print('m_n:{0}'.format(magicNumber)) # print('now idx:{0}'.format(idX)) # print('magicNumber:{0}'.format(magicNumber)) # print('np.matmul(byteBuffer[idX:idX + 4], word):{0}'.format(np.matmul(byteBuffer[idX:idX + 4], word))) # print('b_b:{0}'.format(byteBuffer)) # print('what? header :{0}'.format(byteBuffer[0:13])) version = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x') idX += 4 # print('pure_buf_version :{0}'.format(byteBuffer[idX:idX+4])) #4053 # print('version_re:{0}'.format(format(np.matmul(byteBuffer[8:12], word), 'x'))) # print('just_version : {0}'.format(np.matmul(bytebuffer[idX:idX+4], word))) # print('version:{0}'.format(version)) #30 50 00 4 totalPacketLen = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('totalPacketLen:{0}'.format(totalPacketLen)) platform = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x') idX += 4 # print('platform:{0}'.format(platform)) frameNumber = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('frameNumber:{0}'.format(frameNumber)) subFrameNumber = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('subFrameNumber:{0}'.format(subFrameNumber)) chirpMargin = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('chirpMargin:{0}'.format(chirpMargin)) frameMargin = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('frameMargin:{0}'.format(frameMargin)) trackProcessTime = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('trackProcessTime:{0}'.format(trackProcessTime)) uartSentTime = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('uartSentTime:{0}'.format(uartSentTime)) word = [1, 2 ** 8] #maybe cus 2bytes matmul with numTLv below numTLVs = np.matmul(byteBuffer[idX:idX + 2], word) idX += 2 # print('numTLVs:{0}'.format(numTLVs)) checksum = np.matmul(byteBuffer[idX:idX + 2], word) idX += 2 # print('checksum:{0}'.format(checksum)) # print('done read headData') #actually no! any! correlation with byteBuffer ....-_-;;; # print('start byte buffer:{0}'.format(byteBuffer)) # Read the TLV messages # print('for start in range {0}'.format(numTLVs)) for tlvIdx in range(numTLVs): # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Initialize the tlv type tlv_type = 0 # print("idx:{0}".format(idx)) try: # Check the header of the TLV message # print("must come here ") # print('tlv_type_idx start:{0}'.format(idX)) tlv_type = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('13_tlv_type:{0}'.format(tlv_type)) tlv_length = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('{0}'.format(tlv_length)) except: pass #only here # Read the data depending on the TLV message if tlv_type == MMWDEMO_UART_MSG_POINT_CLOUD_3D: # HERE HERE # word array to convert 4 bytes to a 16 bit number # print("TLV message : pointclound2D IN") # print("pointCloudStart") word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Calculate the number of detected points numInputPoints = (tlv_length - tlvHeaderLengthInBytes) // pointLengthInBytes #sth wrong from here print('numOfInputPoint :{0}'.format(numInputPoints)) # rangeVal = np.zeros(numInputPoints, dtype=np.float32) # azimuth = np.zeros(numInputPoints, dtype=np.float32) # dopplerVal = np.zeros(numInputPoints, dtype=np.float32) # snr = np.zeros(numInputPoints, dtype=np.float32) elevation = np.zeros(numInputPoints, dtype=np.float32) azimuth = np.zeros(numInputPoints, dtype=np.float32) dopplerVal = np.zeros(numInputPoints, dtype=np.float32) rangeVal = np.zeros(numInputPoints, dtype=np.float32) snr = np.zeros(numInputPoints, dtype=np.float32) # nonlocal idx # print('should be initialized elevationUnit{0}azimuthUnit{1}dopplerUnit{2}dopplerUnit{3}snrUnit{4}'.format(elevationUnit,azimuthUnit,dopplerUnit,rangeUnit,snrUnit)) # print('before byteBuffer{0}'.format(byteBuffer)) # pUnitStruct = '5f' # pUnitSize = struct.calcsize(pUnitStruct) # pUnit = struct.unpack(pUnitStruct, byteVec[:pUnitSize]) #byteVec ?? #add by jimmy el_val = np.zeros(numInputPoints, dtype=np.uint8) az_val = np.zeros(numInputPoints, dtype=np.uint8) dop_val = np.zeros(numInputPoints, dtype=np.uint16) range_val = np.zeros(numInputPoints, dtype=np.uint16) snr_val = np.zeros(numInputPoints, dtype=np.uint16) #add by Q ele_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('ele_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('ele_unit:{0}'.format(ele_unit)) idX += 4 azi_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('azi_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('azi_unit:{0}'.format(azi_unit)) idX += 4 dop_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('dop_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('dop_unit:{0}'.format(dop_unit)) idX += 4 rng_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('rng_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('rng_unit:{0}'.format(rng_unit)) idX += 4 snr_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('snr_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('snr_unit:{0}'.format(snr_unit)) idX += 4 for objectNum in range(numInputPoints): elevation[objectNum] = byteBuffer[idX].view(dtype=np.int8)*ele_unit # print('elevation[{0}] :{1}'.format(objectNum, elevation[objectNum])) idX += 1 azimuth[objectNum] = byteBuffer[idX].view(dtype=np.int8)*azi_unit # print('azimuth[{0}] :{1}'.format(objectNum, azimuth[objectNum])) idX += 1 dopplerVal[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.int16)*dop_unit # print('dopplerVal[{0}] :{1}'.format(objectNum, dopplerVal[objectNum])) idX += 2 rangeVal[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.uint16)*rng_unit # print('rangeVal[{0}] :{1}'.format(objectNum, rangeVal[objectNum])) idX += 2 snr[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.uint16)*snr_unit # print('snr[{0}] :{1}'.format(objectNum, snr[objectNum])) idX += 2 # #only Logic # Read the data for each object #is it right idx position ? # # print("current idx : {0}".format(idx)) # # print("idX before elevation:{0}".format(idX)) # # print('zero_ele:{0}'.format(elevation)) # print('objNum:{0}'.format(objectNum)) # elevation[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # # elevation = byteBuffer[idX:idX + 4].view(dtype=np.float32) # # what = np.array([10,215,35,60]).view(dtype=np.float32) # # test_ele = elevation # print('ele_buffer :{0}'.format(byteBuffer[idX:idX+4])) # # print('test_ele:{0}'.format(test_ele)) # # print('wtf:{0}'.format(what)) # # print('shape:{0}'.format(byteBuffer.shape)) # # print('type:{0}'.format(type(byteBuffer))) # # if(np.array([10,215,35,60]) == byteBuffer[idX:idX+4]): # # print('SAME!!!!') # idX += 4 # # print(pUnit[0]) # print('elevation[objectNum]:{0}'.format(elevation[objectNum])) # # print('elevation[objectNum]:{0}'.format(elevation)) # rangeVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('range_buffer :{0}'.format(byteBuffer[idX:idX+4])) # idX += 4 # # print(pUnit[3]) # print('rangeVal[objectNum]:{0}'.format(rangeVal[objectNum] )) # azimuth[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # idX += 4 # dopplerVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # idX += 4 # snr[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('last_snr_buffer:{0}'.format(byteBuffer[idX:idX+4])) # idX += 4 # #add by jimmy # word=[1] # el_val[objectNum] = np.matmul(byteBuffer[idX:idX + 1], word) # print('new_el_buffer:{0}'.format(byteBuffer[idX:idX + 1])) # print('new_el_val[objNum]:{0}'.format(el_val[objectNum])) # idX += 1 # az_val[objectNum] = np.matmul(byteBuffer[idX:idX + 1], word) # print('new_az_buffer:{0}'.format(byteBuffer[idX:idX + 1])) # print('new_az_val[objNum]:{0}'.format(az_val[objectNum])) # idX += 1 # word=[1, 2**8] # dop_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_dop_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_dop_val[objNum]:{0}'.format(dop_val[objectNum])) # idX +=2 # range_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_range_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_range_val[objNum]:{0}'.format(range_val[objectNum])) # idX +=2 # snr_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_snr_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_snr_val[objNum]:{0}'.format(snr_val[objectNum])) # idX +=2 pointObj = {"numObj": numInputPoints, "range": rangeVal, "azimuth": azimuth,\ "doppler": dopplerVal,"elevation":elevation, "snr": snr} # print('POINT ALL:{0}'.format(pointObj)) print('POINT_range:{0}'.format(pointObj["range"])) print('POINT_azi:{0}'.format(pointObj["azimuth"])) print('POINT_ele:{0}'.format(pointObj["elevation"])) dataOK = 1 elif tlv_type == MMWDEMO_UART_MSG_TARGET_LIST_2D: word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Calculate the number of target points numTargetPoints = (tlv_length - tlvHeaderLengthInBytes) // targetLengthInBytes # Initialize the arrays print('numTargetPoints:{0}'.format(numTargetPoints)) # fine targetId = np.zeros(numTargetPoints, dtype=np.uint32) posX = np.zeros(numTargetPoints, dtype=np.float32) posY = np.zeros(numTargetPoints, dtype=np.float32) velX = np.zeros(numTargetPoints, dtype=np.float32) velY = np.zeros(numTargetPoints, dtype=np.float32) accX = np.zeros(numTargetPoints, dtype=np.float32) accY = np.zeros(numTargetPoints, dtype=np.float32) EC = np.zeros((3, 3, numTargetPoints), dtype=np.float32) # Error covariance matrix G = np.zeros(numTargetPoints, dtype=np.float32) # Gain for objectNum in range(numTargetPoints): # Read the data for each object # print("idx:{0}".format(idX)) targetId[objectNum] = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 posX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 posY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 velX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 velY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 accX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 accY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 G[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 # Store the data in the detObj dictionary targetObj = {"targetId": targetId, "posX": posX, "posY": posY, \ "velX": velX, "velY": velY, "accX": accX, "accY": accY, \ "EC": EC, "G": G, "numTargets":numTargetPoints} print("TARGET_OBJ_posX:{0}".format(targetObj["posX"])) print("TARGET_OBJ_posY:{0}".format(targetObj["posY"])) targetDetected = 1 elif tlv_type == MMWDEMO_UART_MSG_TARGET_INDEX_2D: # Calculate the length of the index message # print("not here maybe2") numIndices = tlv_length - tlvHeaderLengthInBytes indices = byteBuffer[idX:idX + numIndices] idX += numIndices # Remove already processed data if idX > 0: shiftSize = totalPacketLen byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength] byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]),dtype = 'uint8') byteBufferLength = byteBufferLength - shiftSize # Check that there are no errors with the buffer length if byteBufferLength < 0: byteBufferLength = 0 return dataOK, targetDetected, frameNumber, targetObj, pointObj # ------------------------------------------------------------------ # Funtion to update the data and display in the plot def update(): dataOk = 0 targetDetected = 0 global targetObj global pointObj x = [] y = [] # Read and parse the received data dataOk, targetDetected, frameNumber, targetObj, pointObj = readAndParseData16xx(Dataport, configParameters) print('check_dataOk:{0}'.format(dataOk)) print('targetDetected:{0}'.format(targetDetected)) print('frameNumber:{0}'.format(frameNumber)) if p_cnt % 1000 == 0 : printTest() if targetDetected: # print(targetObj) # print(targetObj["numTargets"]) x = -targetObj["posX"] y = targetObj["posY"] print('tar_x:{0}'.format(x)) print('tar_y:{0}'.format(y)) target_cnt = 0; # LOCATION logic start #HARD CODING if(1.5<x.any() <2.2 and 2.4<y.any() <3.2): #This line has to be changed GUI User Interface Command print('TARGET DETECTED THEN DB IN ') x_d = x y_d = y # print('target_x:{0}'.format(x)) # s3.setData(x_d,y_d) d = datetime.today().strftime('%Y-%m-%d %H:%M:%S') sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( '_Radar_target', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'target') ) Maria.commit(); time.sleep(0.5) ##################################### # s2.setData(x,y, clear = True) # QtGui.QApplication.processEvents() # print('dataOk: {0}, targetDetected:{1}, frameNumber:{2}, targetObj:{3}, pointobj:{4}'.format(dataOK,targetDetected, frameNumber, targetObj, pointObj)) print(dataOk) if dataOk: x = -pointObj["range"]*np.sin(pointObj["azimuth"]) y = pointObj["range"]*np.cos(pointObj["azimuth"]) # point_range = pointObj["range"] # point_azimuth = pointObj["azimuth"] # print('FINAL_Range : {0}'.format(point_range)) # print('FINAL_Azimuth : {0}'.format(point_azimuth)) # x = -pointObj["rangeUnit"]*np.sin(pointObj["azimuthUnit"]) # y = pointObj["rangeUnit"]*np.cos(pointObj["azimuthUnit"]) print('FINAL_x:{0}'.format(x)) print('FINAL_y:{0}'.format(y)) #DB logic added point_cnt = 0; px_tmp=[]; for po in range(len(x)): if(1.5 < x[po]< 2.3 and 2.4< y[po] < 3.2 ): #specific targeted area # print("HHHHHHHHHHHHHHHHH") point_cnt += 1 print('poin_cnt:{0}'.format(point_cnt)) # ave_tmp = np.mean(x[po]) # p_tmp.append(x[po]) # print(ave_tmp) # px_tmp.append(ave_tmp) # if(ave_tmp<) # print(px_tmp) if(point_cnt >3): print("PEOPLE DETECDED ") #ok # d = datetime.today().strftime('%Y-%m-%d %H:%M:%S') sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( 'People Detected ', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'mainDoor' ) ) Maria.commit(); print("DB saved") # f = open('log.txt','w') # # data = "%Y-%m-%d %H:%M:%S", time.localtime() # f.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) # f.close() time.sleep(0.5) # s1.setData(x,y, clear=True) # QtGui.QApplication.processEvents() print('dataok:{0}'.format(dataOk)) return dataOk # ------------------------- MAIN ----------------------------------------- #try #### class # Configurate the serial port CLIport, Dataport = serialConfig(configFileName) # Get the configuration parameters from the configuration file configParameters = parseConfigFile(configFileName) # START QtAPPfor the plot # app = QtGui.QApplication([]) # Set the plot # pg.setConfigOption('background','w') # win = pg.GraphicsWindow(title="2D scatter plot") # p = win.addPlot() # # p.setXRange(-0.5,0.5) # p.setXRange(-3,3) # p.setYRange(0,6) # p.setLabel('left',text = 'Y position (m)') # p.setLabel('bottom', text= 'X position (m)') # s1 = p.plot([],[],pen=None,symbol='o') # s1 = p.plot([],[],pen=None,symbol='o') # s2 = p.plot([],[],pen=(0,0,255),symbol='star') # s3 = p.plot([],[],pen=(0,0,255),symbol='x') # Main loop targetObj = {} pointObj = {} frameData = {} currentIndex = 0 while True: p_cnt+=1 print(p_cnt) try: # Update the data and check if the data is okay dataOk = update() if(p_cnt%50000 == 0): printTest() if dataOk: # Store the current frame into frameData # print('print_takget : {0}'.format(targetObj)) frameData[currentIndex] = targetObj currentIndex += 1 time.sleep(0.033) # Sampling frequency of 30 Hz # Stop the program and close everything if Ctrl + c is pressed except KeyboardInterrupt: CLIport.write(('sensorStop\n').encode()) CLIport.close() Dataport.close() win.close() break
'Share > TI Radar' 카테고리의 다른 글
TI mmWave People Counting Demo Setting, 세팅 관련 요약 (5) 2021.04.22 TI IWR 6843 + RPI example (0) 2021.04.15 Ti Radar board 세팅 - Visualizer Demo 까지 (0) 2021.01.12 TI mmWave Vital Sign Lab. (0) 2020.09.19