Mercurial > hg > nsaunier > traffic-intelligence
diff python/moving.py @ 708:a37c565f4b68
merged dev
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Wed, 22 Jul 2015 14:17:44 -0400 |
| parents | 463150a8e129 |
| children | e14e2101a5a9 |
line wrap: on
line diff
--- a/python/moving.py Wed Jul 22 14:17:19 2015 -0400 +++ b/python/moving.py Wed Jul 22 14:17:44 2015 -0400 @@ -5,7 +5,7 @@ from base import VideoFilenameAddable from math import sqrt, atan2, cos, sin -from numpy import median, array, zeros, hypot, NaN, std +from numpy import median, array, zeros, hypot, NaN, std, floor, float32 from matplotlib.pyplot import plot from scipy.stats import scoreatpercentile from scipy.spatial.distance import cdist @@ -78,7 +78,6 @@ else: return None - def unionIntervals(intervals): 'returns the smallest interval containing all intervals' inter = intervals[0] @@ -123,6 +122,9 @@ '''Returns the length of the interval''' return float(max(0,self.last-self.first+1)) + def __len__(self): + return self.length() + # class BoundingPolygon: # '''Class for a polygon bounding a set of points # with methods to create intersection, unions... @@ -142,11 +144,17 @@ self.boundingPolygon = boundingPolygon def empty(self): - return self.timeInterval.empty() or not self.boudingPolygon + return self.timeInterval.empty()# or not self.boudingPolygon def getNum(self): return self.num + def __len__(self): + return self.timeInterval.length() + + def length(self): + return self.timeInterval.length() + def getFirstInstant(self): return self.timeInterval.first @@ -190,8 +198,12 @@ else: raise IndexError() - def orthogonal(self): - return Point(self.y, -self.x) + def orthogonal(self, clockwise = True): + 'Returns the orthogonal vector' + if clockwise: + return Point(self.y, -self.x) + else: + return Point(-self.y, self.x) def multiply(self, alpha): 'Warning, returns a new Point' @@ -1055,9 +1067,6 @@ print 'The object does not exist at '+str(inter) return None - def length(self): - return self.timeInterval.length() - def getPositions(self): return self.positions @@ -1129,8 +1138,13 @@ print('Object {} has no features loaded.'.format(self.getNum())) return None - def getSpeeds(self): - return self.getVelocities().norm() + def getSpeeds(self, nInstantsIgnoredAtEnds = 0): + speeds = self.getVelocities().norm() + if nInstantsIgnoredAtEnds > 0: + n = min(nInstantsIgnoredAtEnds, int(floor(self.length()/2.))) + return speeds[n:-n] + else: + return speeds def getSpeedIndicator(self): from indicators import SeverityIndicator @@ -1346,23 +1360,20 @@ ### # User Type Classification ### - def classifyUserTypeSpeedMotorized(self, threshold, aggregationFunc = median, ignoreNInstantsAtEnds = 0): + def classifyUserTypeSpeedMotorized(self, threshold, aggregationFunc = median, nInstantsIgnoredAtEnds = 0): '''Classifies slow and fast road users slow: non-motorized -> pedestrians fast: motorized -> cars aggregationFunc can be any function that can be applied to a vector of speeds, including percentile: aggregationFunc = lambda x: percentile(x, percentileFactor) # where percentileFactor is 85 for 85th percentile''' - if ignoreNInstantsAtEnds > 0: - speeds = self.getSpeeds()[ignoreNInstantsAtEnds:-ignoreNInstantsAtEnds] - else: - speeds = self.getSpeeds() + speeds = self.getSpeeds(nInstantsIgnoredAtEnds) if aggregationFunc(speeds) >= threshold: self.setUserType(userType2Num['car']) else: self.setUserType(userType2Num['pedestrian']) - def classifyUserTypeSpeed(self, speedProbabilities, aggregationFunc = median): + def classifyUserTypeSpeed(self, speedProbabilities, aggregationFunc = median, nInstantsIgnoredAtEnds = 0): '''Classifies road user per road user type speedProbabilities are functions return P(speed|class) in a dictionary indexed by user type names @@ -1375,62 +1386,67 @@ else: return x''' if not hasattr(self, 'aggregatedSpeed'): - self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + self.aggregatedSpeed = aggregationFunc(self.getSpeeds(nInstantsIgnoredAtEnds)) userTypeProbabilities = {} for userTypename in speedProbabilities: userTypeProbabilities[userType2Num[userTypename]] = speedProbabilities[userTypename](self.aggregatedSpeed) self.setUserType(utils.argmaxDict(userTypeProbabilities)) return userTypeProbabilities - def initClassifyUserTypeHoGSVM(self, aggregationFunc = median): + def initClassifyUserTypeHoGSVM(self, aggregationFunc, pedBikeCarSVM, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), nInstantsIgnoredAtEnds = 0): '''Initializes the data structures for classification - TODO? compute speed for longest feature? - Skip beginning and end of feature for speed? Offer options instead of median''' - self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + TODO? compute speed for longest feature?''' + self.aggregatedSpeed = aggregationFunc(self.getSpeeds(nInstantsIgnoredAtEnds)) + if self.aggregatedSpeed < pedBikeSpeedTreshold or bikeCarSVM is None: + self.appearanceClassifier = pedBikeCarSVM + elif self.aggregatedSpeed < bikeCarSpeedThreshold: + self.appearanceClassifier = bikeCarSVM + else: + class CarClassifier: + def predict(self, hog): + return userType2Num['car'] + self.appearanceClassifier = CarClassifier() + self.userTypes = {} - def classifyUserTypeHoGSVMAtInstant(self, img, pedBikeCarSVM, instant, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), px = 0.2, py = 0.2, pixelThreshold = 800): + def classifyUserTypeHoGSVMAtInstant(self, img, instant, homography, width, height, px = 0.2, py = 0.2, minNPixels = 800): '''Extract the image box around the object and applies the SVM model on it''' - croppedImg, yCropMin, yCropMax, xCropMin, xCropMax = imageBox(img, self, instant, homography, width, height, px, py, pixelThreshold) - if len(croppedImg) > 0: # != [] - hog = array([cvutils.HOG(croppedImg)], dtype = np.float32) - if self.aggregatedSpeed < pedBikeSpeedTreshold or bikeCarSVM is None: - self.userTypes[instant] = int(pedBikeCarSVM.predict(hog)) - elif self.aggregatedSpeed < bikeCarSpeedTreshold: - self.userTypes[instant] = int(bikeCarSVM.predict(hog)) - else: - self.userTypes[instant] = userType2Num['car'] + croppedImg, yCropMin, yCropMax, xCropMin, xCropMax = cvutils.imageBox(img, self, instant, homography, width, height, px, py, minNPixels) + if croppedImg is not None and len(croppedImg) > 0: + hog = cvutils.HOG(croppedImg)#HOG(image, rescaleSize = (64, 64), orientations=9, pixelsPerCell=(8, 8), cellsPerBlock=(2, 2), visualize=False, normalize=False) + self.userTypes[instant] = int(self.appearanceClassifier.predict(hog)) else: self.userTypes[instant] = userType2Num['unknown'] - def classifyUserTypeHoGSVM(self, images, pedBikeCarSVM, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), speedProbabilities = None, aggregationFunc = median, px = 0.2, py = 0.2, pixelThreshold = 800): + def classifyUserTypeHoGSVM(self, pedBikeCarSVM = None, width = 0, height = 0, homography = None, images = None, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), minSpeedEquiprobable = -1, speedProbabilities = None, aggregationFunc = median, nInstantsIgnoredAtEnds = 0, px = 0.2, py = 0.2, minNPixels = 800): '''Agregates SVM detections in each image and returns probability (proportion of instants with classification in each category) - iamges is a dictionary of images indexed by instant + images is a dictionary of images indexed by instant With default parameters, the general (ped-bike-car) classifier will be used - TODO? consider all categories?''' - if not hasattr(self, aggregatedSpeed) or not hasattr(self, userTypes): + + Considered categories are the keys of speedProbabilities''' + if not hasattr(self, 'aggregatedSpeed') or not hasattr(self, 'userTypes'): print('Initilize the data structures for classification by HoG-SVM') - self.initClassifyUserTypeHoGSVM(aggregationFunc) + self.initClassifyUserTypeHoGSVM(aggregationFunc, pedBikeCarSVM, bikeCarSVM, pedBikeSpeedTreshold, bikeCarSpeedThreshold, nInstantsIgnoredAtEnds) - if len(self.userTypes) != self.length(): # if classification has not been done previously + if len(self.userTypes) != self.length() and images is not None: # if classification has not been done previously for t in self.getTimeInterval(): if t not in self.userTypes: - self.classifyUserTypeHoGSVMAtInstant(images[t], pedBikeCarSVM, t, homography, width, height, bikeCarSVM, pedBikeSpeedTreshold, bikeCarSpeedThreshold, px, py, pixelThreshold) + self.classifyUserTypeHoGSVMAtInstant(images[t], t, homography, width, height, px, py, minNPixels) # compute P(Speed|Class) - if speedProbabilities is None: # equiprobable information from speed + if speedProbabilities is None or self.aggregatedSpeed < minSpeedEquiprobable: # equiprobable information from speed userTypeProbabilities = {userType2Num['car']: 1., userType2Num['pedestrian']: 1., userType2Num['bicycle']: 1.} else: userTypeProbabilities = {userType2Num[userTypename]: speedProbabilities[userTypename](self.aggregatedSpeed) for userTypename in speedProbabilities} # result is P(Class|Appearance) x P(Speed|Class) - nInstantsUserType = {userType2Num[userTypename]: 0 for userTypename in userTypeProbabilities}# number of instants the object is classified as userTypename + nInstantsUserType = {userTypeNum: 0 for userTypeNum in userTypeProbabilities}# number of instants the object is classified as userTypename for t in self.userTypes: - nInstantsUserType[self.userTypes[t]] += 1 - for userTypename in userTypeProbabilities: - userTypeProbabilities[userTypename] *= nInstantsUserType[userTypename] + nInstantsUserType[self.userTypes[t]] = nInstantsUserType.get(self.userTypes[t], 0) + 1 + for userTypeNum in userTypeProbabilities: + userTypeProbabilities[userTypeNum] *= nInstantsUserType[userTypeNum] # class is the user type that maximizes usertype probabilities self.setUserType(utils.argmaxDict(userTypeProbabilities))
