Mercurial > hg > nsaunier > traffic-intelligence
changeset 1204:a12d126346ff
merge
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Wed, 22 Mar 2023 22:01:12 -0400 |
| parents | 7b3384a8e409 (diff) 4356065ed3ca (current diff) |
| children | 3905b393ade0 |
| files | trafficintelligence/storage.py |
| diffstat | 2 files changed, 165 insertions(+), 6 deletions(-) [+] |
line wrap: on
line diff
--- a/trafficintelligence/moving.py Thu Dec 08 10:24:22 2022 -0500 +++ b/trafficintelligence/moving.py Wed Mar 22 22:01:12 2023 -0400 @@ -357,8 +357,13 @@ return (p1-p2).norm2() @staticmethod - def plotAll(points, options = '', **kwargs): - plot([p.x for p in points], [p.y for p in points], options, **kwargs) + def plotAll(points, options = '', closePolygon = False, **kwargs): + xCoords = [p.x for p in points] + yCoords = [p.y for p in points] + if closePolygon: + xCoords.append[0] + yCoords.append[0] + plot(xCoords, yCoords, options, **kwargs) def similarOrientation(self, refDirection, cosineThreshold): 'Indicates whether the cosine of the vector and refDirection is smaller than cosineThreshold' @@ -1253,7 +1258,7 @@ and a usertype (e.g. road user) coded as a number (see userTypeNames) ''' - def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): + def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, features = None, initCurvilinear = False): super(MovingObject, self).__init__(num, timeInterval) if initCurvilinear: self.curvilinearPositions = positions @@ -1264,7 +1269,7 @@ self.geometry = geometry self.userType = userType self.setNObjects(nObjects) # a feature has None for nObjects - self.features = None + self.features = features # compute bounding polygon from trajectory @staticmethod @@ -1651,6 +1656,11 @@ else: self.positions.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, None, **kwargs) + def plotOutlineAtInstant(self, t, options = '', **kwargs): + if self.hasFeatures(): + points = [f.getPositionAtInstant(t) for f in self.getFeatures()] + Point.plotAll(points, options, True, kwargs) + def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1.): cvutils.displayTrajectories(videoFilename, [self], homography = homography, firstFrameNum = self.getFirstInstant(), lastFrameNumArg = self.getLastInstant(), undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication)
--- a/trafficintelligence/storage.py Thu Dec 08 10:24:22 2022 -0500 +++ b/trafficintelligence/storage.py Wed Mar 22 22:01:12 2023 -0400 @@ -7,7 +7,7 @@ from copy import copy import sqlite3, logging -from numpy import log, min as npmin, max as npmax, round as npround, array, sum as npsum, loadtxt, floor as npfloor, ceil as npceil, linalg, int32, int64 +from numpy import log, min as npmin, max as npmax, round as npround, array, sum as npsum, loadtxt, floor as npfloor, ceil as npceil, linalg, int32, int64, reshape, dot, vstack, transpose, ones from pandas import read_csv, merge from trafficintelligence import utils, moving, events, indicators @@ -1256,6 +1256,155 @@ inputfile.close() return objects +# from https://github.com/kuixu/kitti_object_vis +def inverse_rigid_trans(Tr): + """ Inverse a rigid body transform matrix (3x4 as [R|t]) + [R'|-R't; 0|1] + """ + import numpy as np + inv_Tr = np.zeros_like(Tr) # 3x4 + inv_Tr[0:3, 0:3] = transpose(Tr[0:3, 0:3]) + inv_Tr[0:3, 3] = dot(-transpose(Tr[0:3, 0:3]), Tr[0:3, 3]) + return inv_Tr + +def loadKITTICalibration(filename): + '''Loads KITTI calibration data''' + calib = {} + with open(filename, 'r') as f: + for l in f.readlines(): + l = l.rstrip() + if len(l) == 0: + continue + key, value = l.split(' ', 1) + if ":" in key: + key = key[:-1] + try: + calib[key] = array([float(x) for x in value.split()]) + except ValueError as e: + print(e) + continue + #calib['Tr_velo_to_cam'] = calib['Tr_velo_cam'] + #calib['Tr_imu_to_velo'] = calib['Tr_imu_velo'] + temps = reshape(calib['Tr_velo_cam'], (3, 4)) + calib['Tr_cam_velo'] = inverse_rigid_trans(temps) + temps = reshape(calib['Tr_imu_velo'], (3, 4)) + calib['Tr_velo_imu'] = inverse_rigid_trans(temps) + + P = reshape(calib['P2'], (3, 4)) + calib['c_u'] = P[0, 2] + calib['c_v'] = P[1, 2] + calib['f_u'] = P[0, 0] + calib['f_v'] = P[1, 1] + calib['b_x'] = P[0, 3] / (-calib['f_u']) # relative + calib['b_y'] = P[1, 3] / (-calib['f_v']) + return calib + +# from https://github.com/utiasSTARS/pykitti/blob/master/pykitti/utils.py + + +# def get_cam_in_ex(calib): +# """ +# Qingwu +# :param calib: calib from kitti_tracking +# :return: cam_parameters: Camera intrinsics and extrinsics +# """ +# cam_parameters = {} +# P = np.reshape(calib["P2"], [3, 4]) +# cam_parameters["c_u"] = P[0, 2] +# cam_parameters["c_v"] = P[1, 2] +# cam_parameters["f_u"] = P[0, 0] +# cam_parameters["f_v"] = P[1, 1] +# cam_parameters["b_x"] = P[0, 3] / (-cam_parameters["f_u"]) # relative +# cam_parameters["b_y"] = P[1, 3] / (-cam_parameters["f_v"]) +# return cam_parameters + + +def loadTrajectoriesFromKITTI(filename, kittiCalibration, oxts): + '''Reads data from KITTI ground truth or output from an object detection and tracking method + + kittiCalibration is obtained from loading training/testing calibration file for each sequence 00XX.txt + oxts is obtained using utils.load_oxts_packets_and_poses(['./training/oxts/0001.txt']) from pykitti + + Ref: https://github.com/pratikac/kitti/blob/master/readme.tracking.txt''' + from pykitti.utils import roty + + invR0 = linalg.inv(reshape(kittiCalibration['R_rect'], (3, 3))) + + header = ['frame', # 0, 1, ..., n + 'trackingid', # -1, 0 , 1, ..., k + 'type', # 'Car', 'Pedestrian', ... + 'truncation', # truncated pixel ratio [0..1] + 'occlusion', # 0=visible, 1=partly occluded, 2=fully occluded, 3=unknown + 'alpha', # object observation angle [-pi..pi] + # extract 2d bounding box in 0-based coordinates + 'xmin', # left + 'ymin', # top + 'xmax', # right + 'ymax', # bottom + # extract 3d bounding box information + 'h', # box height + 'w', # box width + 'l', # box length (in meters) + 'x', 'y', 'z', # location (x,y,z) in camera coord + 'ry'] # yaw angle (around Y-axis in camera coordinates) [-pi..pi] + data = read_csv(filename, delimiter = ' ', names = header) + data = data[data.trackingid > -1] + + objects = [] + featureNum = 0 + for objNum in data.trackingid.unique(): + tmp = data[data.trackingid == objNum].sort_values('frame') + t = moving.Trajectory()#([[float(numbers[6])],[float(numbers[7])]]) + featureTrajectories = [moving.Trajectory() for i in range(4)] + for i, r in tmp.iterrows(): + _, Tr_imu2w = oxts[r.frame] + # transfer_matrix = {'P2': reshape(kittiCalibration['P2'], (3, 4)), + # 'R_rect': kittiCalibration['R_rect'], + # 'Tr_cam_to_velo': kittiCalibration['Tr_cam_velo'], + # # transfer_matrix['Tr_velo_to_cam'] = kittiCalibration['Tr_velo_to_cam'] + # 'Tr_velo_to_imu': kittiCalibration['Tr_velo_imu'], + # # transfer_matrix['Tr_imu_to_velo'] = kittiCalibration['Tr_imu_to_velo'] + # 'Tr_imu_to_world': Tr_imu2w} + # 3D object + # compute rotational matrix around yaw axis + R = roty(r.ry) + + # 3d bounding box corners + x_corners = [r.l / 2, r.l / 2, -r.l / 2, -r.l / 2, r.l / 2, r.l / 2, -r.l / 2, -r.l / 2] + y_corners = [0, 0, 0, 0, -r.h, -r.h, -r.h, -r.h] + z_corners = [r.w / 2, -r.w / 2, -r.w / 2, r.w / 2, r.w / 2, -r.w / 2, -r.w / 2, r.w / 2] + # rotate and translate 3d bounding box + corners3d = dot(R, vstack([x_corners, y_corners, z_corners])) + corners3d[0, :] = corners3d[0, :] + r.x#obj.t[0] + corners3d[1, :] = corners3d[1, :] + r.y#obj.t[1] + corners3d[2, :] = corners3d[2, :] + r.z#obj.t[2] + # corners3d = transpose(corners3d) + # box3d_pts_3d_velo = calib.project_rect_to_velo(box3d_pts_3d) + veloCorners = transpose(dot(invR0, corners3d)) # 8x3 avoid double transpose np.transpose(pts_3d_rect))) + # boxes3d_world_single, flag_imu, flag_world = calib_function.project_velo_to_world(bboxes_3d_velo=boxes3d_velo_single, Tr_matrix=transfer_matrix) + #Tr_velo_to_imu = transfer_matrix['Tr_velo_to_imu'] + #Tr_imu_to_world = transfer_matrix['Tr_imu_to_world'] + homVeloCorners = ones((veloCorners.shape[0], veloCorners.shape[1]+1)) + homVeloCorners[:,:-1] = veloCorners # 8x4 with ones in last column + imuCorners = dot(kittiCalibration['Tr_velo_imu'], homVeloCorners.T).T # 8x3 + + homImuCorners = ones((imuCorners.shape[0], imuCorners.shape[1]+1)) + homImuCorners[:,:-1] = imuCorners # 8x4 with ones in last column + worldCorners = dot(Tr_imu2w, homImuCorners.T).T # 8x3 + + # take first 4 lines of corners, x,y,_ # x0, y0, _ = boxes3d[0] + xCoords = worldCorners[:4,0] + yCoords = worldCorners[:4,1] + t.addPositionXY(xCoords.mean(), yCoords.mean()) + for i in range(4): + featureTrajectories[i].addPositionXY(xCoords[i], yCoords[i]) + # check https://docs.opencv.org/3.4/d9/d0c/group__calib3d.html#ga1019495a2c8d1743ed5cc23fa0daff8c + newObj = moving.MovingObject(num = objNum, timeInterval = moving.TimeInterval(tmp.frame.min(), tmp.frame.max()), positions = t, userType = tmp.iloc[0].type, features = [moving.MovingObject(num = featureNum+i, timeInterval = moving.TimeInterval(tmp.frame.min(), tmp.frame.max()), positions = featureTrajectories[i]) for i in range(4)]) + #newObj.featureNumbers = [f.getNum() for f in newObj.getFeatures()] + objects.append(newObj) + featureNum += 4 + return objects + def convertNgsimFile(inputfile, outputfile, append = False, nObjects = -1, sequenceNum = 0): '''Reads data from the trajectory data provided by NGSIM project and converts to our current format.''' @@ -1549,7 +1698,7 @@ return configDict -if __name__ == "__main__": +if __name__ == '__main__': import doctest import unittest suite = doctest.DocFileSuite('tests/storage.txt')
