-
TI mmwaveISK 6843 with raspberry PI source codeShare/TI Radar 2021. 10. 11. 14:41
It's for the connection between TI mmwave ISK 6843 interfaces and Raspberrypi4 (raspbian)
This code is based on github, https://github.com/ibaiGorordo/AWR1642-Read-Data-Python-MMWAVE-SDK-2
However, the obove is for only ISK 1682 interface message frame which is a little bit differ from the ISK6843.
So I changed the source code for 6843
Just let me share the source code first, and I would share the specific contents soon for understanding.
#try to connect to DB #PointClound Completed 21.05.27 #Simple Logic then DB import mysql.connector; import struct from datetime import datetime import serial import time import numpy as np import pyqtgraph as pg from pyqtgraph.Qt import QtGui import math import threading Maria = mysql.connector.connect(host="10.20.102.202", user="root", passwd="raspberry", database="sensor1"); Cursor = Maria.cursor(prepared=True) # Change the configuration file name # configFileName = 'mmw_pplcount_demo_default.cfg' configFileName = 'ppCntCfg.cfg' # configFileName = '1443config.cfg' # configFileName = 'selfConfig1.cfg' CLIport = {} Dataport = {} byteBuffer = np.zeros(2**15,dtype = 'uint8') byteBufferLength = 0; # ------------------------------------------------------------------ p_cnt = 0 ; def printTest(): # now_time = '({})'.format(datetime.today().strftime('%Y-%m-%d %H:%M:%S')) # print(now_time) sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( 'Check Time Saved ', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'chk') ) Maria.commit(); # threading.Timer(10,printTest).start() # Function to configure the serial ports and send the data from # the configuration file to the radar def serialConfig(configFileName): global CLIport global Dataport # Open the serial ports for the configuration and the data ports # Raspberry pi CLIport = serial.Serial('/dev/ttyUSB0', 115200) Dataport = serial.Serial('/dev/ttyUSB1', 921600) # Windows #CLIport = serial.Serial('COM3', 115200) #Dataport = serial.Serial('COM4', 921600) # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: CLIport.write((i+'\n').encode()) print(i) time.sleep(0.01) return CLIport, Dataport # ------------------------------------------------------------------ # Function to parse the data inside the configuration file def parseConfigFile(configFileName): configParameters = {} # Initialize an empty dictionary to store the configuration parameters # Read the configuration file and send it to the board config = [line.rstrip('\r\n') for line in open(configFileName)] for i in config: # Split the line splitWords = i.split(" ") # Hard code the number of antennas, change if other configuration is used numRxAnt = 4 numTxAnt = 3 #jimmy revised # Get the information about the profile configuration if "profileCfg" in splitWords[0]: startFreq = int(float(splitWords[2])) idleTime = int(float(splitWords[3])) #j rampEndTime = float(splitWords[5]) freqSlopeConst = float(splitWords[8]) numAdcSamples = int(splitWords[10]) digOutSampleRate = int(float(splitWords[11])) numAdcSamplesRoundTo2 = 1; while numAdcSamples > numAdcSamplesRoundTo2: numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2; digOutSampleRate = int(float(splitWords[11])); # Get the information about the frame configuration elif "frameCfg" in splitWords[0]: chirpStartIdx = int(float(splitWords[1]));#j chirpEndIdx = int(splitWords[2]); numLoops = int(float(splitWords[3])); #j numFrames = int(splitWords[4]); framePeriodicity = int(float(splitWords[5])); # Combine the read data to obtain the configuration parameters numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt configParameters["numRangeBins"] = numAdcSamplesRoundTo2 configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples) configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"]) configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt) configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3) configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt) return configParameters # ------------------------------------------------------------------ # Funtion to read and parse the incoming data def readAndParseData16xx(Dataport, configParameters): global byteBuffer, byteBufferLength # Constants OBJ_STRUCT_SIZE_BYTES = 12; BYTE_VEC_ACC_MAX_SIZE = 2**15; MMWDEMO_UART_MSG_POINT_CLOUD_3D = 6; # renamed 2D -> 3D MMWDEMO_UART_MSG_TARGET_LIST_2D = 7; MMWDEMO_UART_MSG_TARGET_INDEX_2D = 8; maxBufferSize = 2**15; tlvHeaderLengthInBytes = 8; pointLengthInBytes = 8; # modified targetLengthInBytes = 68; magicWord = [2, 1, 4, 3, 6, 5, 8, 7] # Initialize variables magicOK = 0 # Checks if magic number has been read dataOK = 0 # Checks if the data has been read correctly targetDetected = 0 # Checks if a person has been detected frameNumber = 0 targetObj = {} pointObj = {} readBuffer = Dataport.read(Dataport.in_waiting) # print('just Read : {0}'.format(readBuffer)) byteVec = np.frombuffer(readBuffer, dtype = 'uint8') byteCount = len(byteVec) print('inital byteVec:{0}'.format(byteVec)) # Check that the buffer is not full, and then add the data to the buffer if (byteBufferLength + byteCount) < maxBufferSize: # print('check buffer, add data -> buffer ') byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount] #understand? # print(byteBufferLength) # print(byteBufferLength + byteCount) # print(byteBuffer[40])#104 # print('maybe vec and buff be samed{0}'.format(byteBuffer[0:byteCount])) # print(len(byteBuffer)) # print(len(byteVec)) byteBufferLength = byteBufferLength + byteCount # Check that the buffer has some data if byteBufferLength > 16: # print('already some data ') # Check for all possible locations of the magic word possibleLocs = np.where(byteBuffer == magicWord[0])[0] # Confirm that is the beginning of the magic word and store the index in startIdx startIdx = [] for loc in possibleLocs: check = byteBuffer[loc:loc + 8] if np.all(check == magicWord): startIdx.append(loc) # Check that startIdx is not empty if startIdx: # Remove the data before the first start index if startIdx[0] > 0 and startIdx[0] < byteBufferLength: byteBuffer[:byteBufferLength - startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength] byteBuffer[byteBufferLength-startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength-startIdx[0]:]),dtype = 'uint8') byteBufferLength = byteBufferLength - startIdx[0] # Check that there have no errors with the byte buffer length if byteBufferLength < 0: byteBufferLength = 0 # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Read the total packet length # totalPacketLen = np.matmul(byteBuffer[20:20 + 4], word) totalPacketLen = np.matmul(byteBuffer[12:12 + 4], word) # Check that all the packet has been read if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0): # print("now magicOk") magicOK = 1 # If magicOK is equal to 1 then process the message if magicOK: # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # print("magic okay") # Initialize the pointer index idX = 0 # Read the header # Read the header #why do this ?? print('current bytebuffer now will cook: {0}'.format(byteBuffer[startIdx[0]:byteBufferLength])) magicNumber = byteBuffer[idX:idX + 8] idX += 8 # print('m_n:{0}'.format(magicNumber)) # print('now idx:{0}'.format(idX)) # print('magicNumber:{0}'.format(magicNumber)) # print('np.matmul(byteBuffer[idX:idX + 4], word):{0}'.format(np.matmul(byteBuffer[idX:idX + 4], word))) # print('b_b:{0}'.format(byteBuffer)) # print('what? header :{0}'.format(byteBuffer[0:13])) version = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x') idX += 4 # print('pure_buf_version :{0}'.format(byteBuffer[idX:idX+4])) #4053 # print('version_re:{0}'.format(format(np.matmul(byteBuffer[8:12], word), 'x'))) # print('just_version : {0}'.format(np.matmul(bytebuffer[idX:idX+4], word))) # print('version:{0}'.format(version)) #30 50 00 4 totalPacketLen = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('totalPacketLen:{0}'.format(totalPacketLen)) platform = format(np.matmul(byteBuffer[idX:idX + 4], word), 'x') idX += 4 # print('platform:{0}'.format(platform)) frameNumber = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('frameNumber:{0}'.format(frameNumber)) subFrameNumber = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('subFrameNumber:{0}'.format(subFrameNumber)) chirpMargin = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('chirpMargin:{0}'.format(chirpMargin)) frameMargin = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('frameMargin:{0}'.format(frameMargin)) trackProcessTime = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('trackProcessTime:{0}'.format(trackProcessTime)) uartSentTime = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('uartSentTime:{0}'.format(uartSentTime)) word = [1, 2 ** 8] #maybe cus 2bytes matmul with numTLv below numTLVs = np.matmul(byteBuffer[idX:idX + 2], word) idX += 2 # print('numTLVs:{0}'.format(numTLVs)) checksum = np.matmul(byteBuffer[idX:idX + 2], word) idX += 2 # print('checksum:{0}'.format(checksum)) # print('done read headData') #actually no! any! correlation with byteBuffer ....-_-;;; # print('start byte buffer:{0}'.format(byteBuffer)) # Read the TLV messages # print('for start in range {0}'.format(numTLVs)) for tlvIdx in range(numTLVs): # word array to convert 4 bytes to a 32 bit number word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Initialize the tlv type tlv_type = 0 # print("idx:{0}".format(idx)) try: # Check the header of the TLV message # print("must come here ") # print('tlv_type_idx start:{0}'.format(idX)) tlv_type = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('13_tlv_type:{0}'.format(tlv_type)) tlv_length = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 # print('{0}'.format(tlv_length)) except: pass #only here # Read the data depending on the TLV message if tlv_type == MMWDEMO_UART_MSG_POINT_CLOUD_3D: # HERE HERE # word array to convert 4 bytes to a 16 bit number # print("TLV message : pointclound2D IN") # print("pointCloudStart") word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Calculate the number of detected points numInputPoints = (tlv_length - tlvHeaderLengthInBytes) // pointLengthInBytes #sth wrong from here print('numOfInputPoint :{0}'.format(numInputPoints)) # rangeVal = np.zeros(numInputPoints, dtype=np.float32) # azimuth = np.zeros(numInputPoints, dtype=np.float32) # dopplerVal = np.zeros(numInputPoints, dtype=np.float32) # snr = np.zeros(numInputPoints, dtype=np.float32) elevation = np.zeros(numInputPoints, dtype=np.float32) azimuth = np.zeros(numInputPoints, dtype=np.float32) dopplerVal = np.zeros(numInputPoints, dtype=np.float32) rangeVal = np.zeros(numInputPoints, dtype=np.float32) snr = np.zeros(numInputPoints, dtype=np.float32) # nonlocal idx # print('should be initialized elevationUnit{0}azimuthUnit{1}dopplerUnit{2}dopplerUnit{3}snrUnit{4}'.format(elevationUnit,azimuthUnit,dopplerUnit,rangeUnit,snrUnit)) # print('before byteBuffer{0}'.format(byteBuffer)) # pUnitStruct = '5f' # pUnitSize = struct.calcsize(pUnitStruct) # pUnit = struct.unpack(pUnitStruct, byteVec[:pUnitSize]) #byteVec ?? #add by jimmy el_val = np.zeros(numInputPoints, dtype=np.uint8) az_val = np.zeros(numInputPoints, dtype=np.uint8) dop_val = np.zeros(numInputPoints, dtype=np.uint16) range_val = np.zeros(numInputPoints, dtype=np.uint16) snr_val = np.zeros(numInputPoints, dtype=np.uint16) #add by Q ele_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('ele_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('ele_unit:{0}'.format(ele_unit)) idX += 4 azi_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('azi_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('azi_unit:{0}'.format(azi_unit)) idX += 4 dop_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('dop_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('dop_unit:{0}'.format(dop_unit)) idX += 4 rng_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('rng_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('rng_unit:{0}'.format(rng_unit)) idX += 4 snr_unit = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('snr_unit_buffer:{0}'.format(byteBuffer[idX:idX + 4])) # print('snr_unit:{0}'.format(snr_unit)) idX += 4 for objectNum in range(numInputPoints): elevation[objectNum] = byteBuffer[idX].view(dtype=np.int8)*ele_unit # print('elevation[{0}] :{1}'.format(objectNum, elevation[objectNum])) idX += 1 azimuth[objectNum] = byteBuffer[idX].view(dtype=np.int8)*azi_unit # print('azimuth[{0}] :{1}'.format(objectNum, azimuth[objectNum])) idX += 1 dopplerVal[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.int16)*dop_unit # print('dopplerVal[{0}] :{1}'.format(objectNum, dopplerVal[objectNum])) idX += 2 rangeVal[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.uint16)*rng_unit # print('rangeVal[{0}] :{1}'.format(objectNum, rangeVal[objectNum])) idX += 2 snr[objectNum] = byteBuffer[idX:idX+2].view(dtype=np.uint16)*snr_unit # print('snr[{0}] :{1}'.format(objectNum, snr[objectNum])) idX += 2 # #only Logic # Read the data for each object #is it right idx position ? # # print("current idx : {0}".format(idx)) # # print("idX before elevation:{0}".format(idX)) # # print('zero_ele:{0}'.format(elevation)) # print('objNum:{0}'.format(objectNum)) # elevation[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # # elevation = byteBuffer[idX:idX + 4].view(dtype=np.float32) # # what = np.array([10,215,35,60]).view(dtype=np.float32) # # test_ele = elevation # print('ele_buffer :{0}'.format(byteBuffer[idX:idX+4])) # # print('test_ele:{0}'.format(test_ele)) # # print('wtf:{0}'.format(what)) # # print('shape:{0}'.format(byteBuffer.shape)) # # print('type:{0}'.format(type(byteBuffer))) # # if(np.array([10,215,35,60]) == byteBuffer[idX:idX+4]): # # print('SAME!!!!') # idX += 4 # # print(pUnit[0]) # print('elevation[objectNum]:{0}'.format(elevation[objectNum])) # # print('elevation[objectNum]:{0}'.format(elevation)) # rangeVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('range_buffer :{0}'.format(byteBuffer[idX:idX+4])) # idX += 4 # # print(pUnit[3]) # print('rangeVal[objectNum]:{0}'.format(rangeVal[objectNum] )) # azimuth[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # idX += 4 # dopplerVal[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # idX += 4 # snr[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) # print('last_snr_buffer:{0}'.format(byteBuffer[idX:idX+4])) # idX += 4 # #add by jimmy # word=[1] # el_val[objectNum] = np.matmul(byteBuffer[idX:idX + 1], word) # print('new_el_buffer:{0}'.format(byteBuffer[idX:idX + 1])) # print('new_el_val[objNum]:{0}'.format(el_val[objectNum])) # idX += 1 # az_val[objectNum] = np.matmul(byteBuffer[idX:idX + 1], word) # print('new_az_buffer:{0}'.format(byteBuffer[idX:idX + 1])) # print('new_az_val[objNum]:{0}'.format(az_val[objectNum])) # idX += 1 # word=[1, 2**8] # dop_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_dop_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_dop_val[objNum]:{0}'.format(dop_val[objectNum])) # idX +=2 # range_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_range_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_range_val[objNum]:{0}'.format(range_val[objectNum])) # idX +=2 # snr_val[objectNum] = np.matmul(byteBuffer[idX:idX+2],word) # print('new_snr_buffer:{0}'.format(byteBuffer[idX:idX + 2])) # print('new_snr_val[objNum]:{0}'.format(snr_val[objectNum])) # idX +=2 pointObj = {"numObj": numInputPoints, "range": rangeVal, "azimuth": azimuth,\ "doppler": dopplerVal,"elevation":elevation, "snr": snr} # print('POINT ALL:{0}'.format(pointObj)) print('POINT_range:{0}'.format(pointObj["range"])) print('POINT_azi:{0}'.format(pointObj["azimuth"])) print('POINT_ele:{0}'.format(pointObj["elevation"])) dataOK = 1 elif tlv_type == MMWDEMO_UART_MSG_TARGET_LIST_2D: word = [1, 2 ** 8, 2 ** 16, 2 ** 24] # Calculate the number of target points numTargetPoints = (tlv_length - tlvHeaderLengthInBytes) // targetLengthInBytes # Initialize the arrays print('numTargetPoints:{0}'.format(numTargetPoints)) # fine targetId = np.zeros(numTargetPoints, dtype=np.uint32) posX = np.zeros(numTargetPoints, dtype=np.float32) posY = np.zeros(numTargetPoints, dtype=np.float32) velX = np.zeros(numTargetPoints, dtype=np.float32) velY = np.zeros(numTargetPoints, dtype=np.float32) accX = np.zeros(numTargetPoints, dtype=np.float32) accY = np.zeros(numTargetPoints, dtype=np.float32) EC = np.zeros((3, 3, numTargetPoints), dtype=np.float32) # Error covariance matrix G = np.zeros(numTargetPoints, dtype=np.float32) # Gain for objectNum in range(numTargetPoints): # Read the data for each object # print("idx:{0}".format(idX)) targetId[objectNum] = np.matmul(byteBuffer[idX:idX + 4], word) idX += 4 posX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 posY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 velX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 velY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 accX[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 accY[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[0, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[1, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 0, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 1, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 EC[2, 2, objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 G[objectNum] = byteBuffer[idX:idX + 4].view(dtype=np.float32) idX += 4 # Store the data in the detObj dictionary targetObj = {"targetId": targetId, "posX": posX, "posY": posY, \ "velX": velX, "velY": velY, "accX": accX, "accY": accY, \ "EC": EC, "G": G, "numTargets":numTargetPoints} print("TARGET_OBJ_posX:{0}".format(targetObj["posX"])) print("TARGET_OBJ_posY:{0}".format(targetObj["posY"])) targetDetected = 1 elif tlv_type == MMWDEMO_UART_MSG_TARGET_INDEX_2D: # Calculate the length of the index message # print("not here maybe2") numIndices = tlv_length - tlvHeaderLengthInBytes indices = byteBuffer[idX:idX + numIndices] idX += numIndices # Remove already processed data if idX > 0: shiftSize = totalPacketLen byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength] byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]),dtype = 'uint8') byteBufferLength = byteBufferLength - shiftSize # Check that there are no errors with the buffer length if byteBufferLength < 0: byteBufferLength = 0 return dataOK, targetDetected, frameNumber, targetObj, pointObj # ------------------------------------------------------------------ # Funtion to update the data and display in the plot def update(): dataOk = 0 targetDetected = 0 global targetObj global pointObj x = [] y = [] # Read and parse the received data dataOk, targetDetected, frameNumber, targetObj, pointObj = readAndParseData16xx(Dataport, configParameters) print('check_dataOk:{0}'.format(dataOk)) print('targetDetected:{0}'.format(targetDetected)) print('frameNumber:{0}'.format(frameNumber)) if p_cnt % 1000 == 0 : printTest() if targetDetected: # print(targetObj) # print(targetObj["numTargets"]) x = -targetObj["posX"] y = targetObj["posY"] print('tar_x:{0}'.format(x)) print('tar_y:{0}'.format(y)) target_cnt = 0; # LOCATION logic start #HARD CODING if(1.5<x.any() <2.2 and 2.4<y.any() <3.2): #This line has to be changed GUI User Interface Command print('TARGET DETECTED THEN DB IN ') x_d = x y_d = y # print('target_x:{0}'.format(x)) # s3.setData(x_d,y_d) d = datetime.today().strftime('%Y-%m-%d %H:%M:%S') sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( '_Radar_target', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'target') ) Maria.commit(); time.sleep(0.5) ##################################### # s2.setData(x,y, clear = True) # QtGui.QApplication.processEvents() # print('dataOk: {0}, targetDetected:{1}, frameNumber:{2}, targetObj:{3}, pointobj:{4}'.format(dataOK,targetDetected, frameNumber, targetObj, pointObj)) print(dataOk) if dataOk: x = -pointObj["range"]*np.sin(pointObj["azimuth"]) y = pointObj["range"]*np.cos(pointObj["azimuth"]) # point_range = pointObj["range"] # point_azimuth = pointObj["azimuth"] # print('FINAL_Range : {0}'.format(point_range)) # print('FINAL_Azimuth : {0}'.format(point_azimuth)) # x = -pointObj["rangeUnit"]*np.sin(pointObj["azimuthUnit"]) # y = pointObj["rangeUnit"]*np.cos(pointObj["azimuthUnit"]) print('FINAL_x:{0}'.format(x)) print('FINAL_y:{0}'.format(y)) #DB logic added point_cnt = 0; px_tmp=[]; for po in range(len(x)): if(1.5 < x[po]< 2.3 and 2.4< y[po] < 3.2 ): #specific targeted area # print("HHHHHHHHHHHHHHHHH") point_cnt += 1 print('poin_cnt:{0}'.format(point_cnt)) # ave_tmp = np.mean(x[po]) # p_tmp.append(x[po]) # print(ave_tmp) # px_tmp.append(ave_tmp) # if(ave_tmp<) # print(px_tmp) if(point_cnt >3): print("PEOPLE DETECDED ") #ok # d = datetime.today().strftime('%Y-%m-%d %H:%M:%S') sql = "INSERT INTO data Values(%s,%s,%s)" Cursor.execute(sql,( 'People Detected ', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'mainDoor' ) ) Maria.commit(); print("DB saved") # f = open('log.txt','w') # # data = "%Y-%m-%d %H:%M:%S", time.localtime() # f.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) # f.close() time.sleep(0.5) # s1.setData(x,y, clear=True) # QtGui.QApplication.processEvents() print('dataok:{0}'.format(dataOk)) return dataOk # ------------------------- MAIN ----------------------------------------- #try #### class # Configurate the serial port CLIport, Dataport = serialConfig(configFileName) # Get the configuration parameters from the configuration file configParameters = parseConfigFile(configFileName) # START QtAPPfor the plot # app = QtGui.QApplication([]) # Set the plot # pg.setConfigOption('background','w') # win = pg.GraphicsWindow(title="2D scatter plot") # p = win.addPlot() # # p.setXRange(-0.5,0.5) # p.setXRange(-3,3) # p.setYRange(0,6) # p.setLabel('left',text = 'Y position (m)') # p.setLabel('bottom', text= 'X position (m)') # s1 = p.plot([],[],pen=None,symbol='o') # s1 = p.plot([],[],pen=None,symbol='o') # s2 = p.plot([],[],pen=(0,0,255),symbol='star') # s3 = p.plot([],[],pen=(0,0,255),symbol='x') # Main loop targetObj = {} pointObj = {} frameData = {} currentIndex = 0 while True: p_cnt+=1 print(p_cnt) try: # Update the data and check if the data is okay dataOk = update() if(p_cnt%50000 == 0): printTest() if dataOk: # Store the current frame into frameData # print('print_takget : {0}'.format(targetObj)) frameData[currentIndex] = targetObj currentIndex += 1 time.sleep(0.033) # Sampling frequency of 30 Hz # Stop the program and close everything if Ctrl + c is pressed except KeyboardInterrupt: CLIport.write(('sensorStop\n').encode()) CLIport.close() Dataport.close() win.close() break
'Share > TI Radar' 카테고리의 다른 글
TI mmWave People Counting Demo Setting, 세팅 관련 요약 (5) 2021.04.22 TI IWR 6843 + RPI example (0) 2021.04.15 Ti Radar board 세팅 - Visualizer Demo 까지 (0) 2021.01.12 TI mmWave Vital Sign Lab. (0) 2020.09.19