Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/events.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 15 Jun 2018 11:19:10 -0400 |
| parents | python/events.py@933670761a57 |
| children | 6a8fe3ed3bc6 |
comparison
equal
deleted
inserted
replaced
| 1027:6129296848d3 | 1028:cc5cb04b04b0 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 '''Libraries for events | |
| 3 Interactions, pedestrian crossing...''' | |
| 4 | |
| 5 from trafficintelligence import moving, prediction, indicators, utils, cvutils, ml | |
| 6 from trafficintelligence.base import VideoFilenameAddable | |
| 7 | |
| 8 import numpy as np | |
| 9 | |
| 10 import multiprocessing | |
| 11 import itertools, logging | |
| 12 | |
| 13 | |
| 14 def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180): | |
| 15 if i[0] not in noiseEntryNums: | |
| 16 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]] | |
| 17 elif i[1] not in noiseExitNums: | |
| 18 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]] | |
| 19 else: | |
| 20 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
| 21 routeSim={} | |
| 22 lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
| 23 for y in prototypesRoutes: | |
| 24 if y in prototypes: | |
| 25 prototypesIDs=prototypes[y] | |
| 26 similarity=[] | |
| 27 for x in prototypesIDs: | |
| 28 s=lcss.computeNormalized(objects[j].positions, objects[x].positions) | |
| 29 similarity.append(s) | |
| 30 routeSim[y]=max(similarity) | |
| 31 route=max(routeSim, key=routeSim.get) | |
| 32 if routeSim[route]>=minSimilarity: | |
| 33 return route | |
| 34 else: | |
| 35 return i | |
| 36 | |
| 37 def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True): | |
| 38 route=(obj.startRouteID,obj.endRouteID) | |
| 39 if useDestination: | |
| 40 if route not in prototypes: | |
| 41 route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums) | |
| 42 return route | |
| 43 | |
| 44 class Interaction(moving.STObject, VideoFilenameAddable): | |
| 45 '''Class for an interaction between two road users | |
| 46 or a road user and an obstacle | |
| 47 | |
| 48 link to the moving objects | |
| 49 contains the indicators in a dictionary with the names as keys | |
| 50 ''' | |
| 51 | |
| 52 categories = {'Head On': 0, | |
| 53 'rearend': 1, | |
| 54 'side': 2, | |
| 55 'parallel': 3} | |
| 56 | |
| 57 indicatorNames = ['Collision Course Dot Product', | |
| 58 'Collision Course Angle', | |
| 59 'Distance', | |
| 60 'Minimum Distance', | |
| 61 'Velocity Angle', | |
| 62 'Speed Differential', | |
| 63 'Collision Probability', | |
| 64 'Time to Collision', # 7 | |
| 65 'Probability of Successful Evasive Action', | |
| 66 'predicted Post Encroachment Time', | |
| 67 'Post Encroachment Time'] | |
| 68 | |
| 69 indicatorNameToIndices = utils.inverseEnumeration(indicatorNames) | |
| 70 | |
| 71 indicatorShortNames = ['CCDP', | |
| 72 'CCA', | |
| 73 'Dist', | |
| 74 'MinDist', | |
| 75 'VA', | |
| 76 'SD', | |
| 77 'PoC', | |
| 78 'TTC', | |
| 79 'P(SEA)', | |
| 80 'pPET', | |
| 81 'PET'] | |
| 82 | |
| 83 indicatorUnits = ['', | |
| 84 'rad', | |
| 85 'm', | |
| 86 'm', | |
| 87 'rad', | |
| 88 'm/s', | |
| 89 '', | |
| 90 's', | |
| 91 '', | |
| 92 's', | |
| 93 's'] | |
| 94 | |
| 95 timeIndicators = ['Time to Collision', 'predicted Post Encroachment Time'] | |
| 96 | |
| 97 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None): | |
| 98 moving.STObject.__init__(self, num, timeInterval) | |
| 99 if timeInterval is None and roadUser1 is not None and roadUser2 is not None: | |
| 100 self.timeInterval = roadUser1.commonTimeInterval(roadUser2) | |
| 101 self.roadUser1 = roadUser1 | |
| 102 self.roadUser2 = roadUser2 | |
| 103 if roaduserNum1 is not None and roaduserNum2 is not None: | |
| 104 self.roadUserNumbers = set([roaduserNum1, roaduserNum2]) | |
| 105 elif roadUser1 is not None and roadUser2 is not None: | |
| 106 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) | |
| 107 else: | |
| 108 self.roadUserNumbers = None | |
| 109 self.categoryNum = categoryNum | |
| 110 self.indicators = {} | |
| 111 self.interactionInterval = None | |
| 112 # list for collison points and crossing zones | |
| 113 self.collisionPoints = None | |
| 114 self.crossingZones = None | |
| 115 | |
| 116 def getRoadUserNumbers(self): | |
| 117 return self.roadUserNumbers | |
| 118 | |
| 119 def setRoadUsers(self, objects): | |
| 120 nums = sorted(list(self.getRoadUserNumbers())) | |
| 121 if nums[0]<len(objects) and objects[nums[0]].getNum() == nums[0]: | |
| 122 self.roadUser1 = objects[nums[0]] | |
| 123 if nums[1]<len(objects) and objects[nums[1]].getNum() == nums[1]: | |
| 124 self.roadUser2 = objects[nums[1]] | |
| 125 | |
| 126 if self.roadUser1 is None or self.roadUser2 is None: | |
| 127 self.roadUser1 = None | |
| 128 self.roadUser2 = None | |
| 129 i = 0 | |
| 130 while i < len(objects) and self.roadUser2 is None: | |
| 131 if objects[i].getNum() in nums: | |
| 132 if self.roadUser1 is None: | |
| 133 self.roadUser1 = objects[i] | |
| 134 else: | |
| 135 self.roadUser2 = objects[i] | |
| 136 i += 1 | |
| 137 | |
| 138 def getIndicator(self, indicatorName): | |
| 139 return self.indicators.get(indicatorName, None) | |
| 140 | |
| 141 def addIndicator(self, indicator): | |
| 142 if indicator is not None: | |
| 143 self.indicators[indicator.name] = indicator | |
| 144 | |
| 145 def getIndicatorValueAtInstant(self, indicatorName, instant): | |
| 146 indicator = self.getIndicator(indicatorName) | |
| 147 if indicator is not None: | |
| 148 return indicator[instant] | |
| 149 else: | |
| 150 return None | |
| 151 | |
| 152 def getIndicatorValuesAtInstant(self, instant): | |
| 153 '''Returns list of indicator values at instant | |
| 154 as dict (with keys from indicators dict)''' | |
| 155 values = {} | |
| 156 for k, indicator in self.indicators.items(): | |
| 157 values[k] = indicator[instant] | |
| 158 return values | |
| 159 | |
| 160 def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs): | |
| 161 self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs) | |
| 162 self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs) | |
| 163 | |
| 164 def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs): | |
| 165 self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs) | |
| 166 self.roadUser2.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs) | |
| 167 | |
| 168 def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1., allUserInstants = False): | |
| 169 if self.roadUser1 is not None and self.roadUser2 is not None: | |
| 170 if allUserInstants: | |
| 171 firstFrameNum = min(self.roadUser1.getFirstInstant(), self.roadUser2.getFirstInstant()) | |
| 172 lastFrameNum = max(self.roadUser1.getLastInstant(), self.roadUser2.getLastInstant()) | |
| 173 else: | |
| 174 firstFrameNum = self.getFirstInstant() | |
| 175 lastFrameNum = self.getLastInstant() | |
| 176 cvutils.displayTrajectories(videoFilename, [self.roadUser1, self.roadUser2], homography = homography, firstFrameNum = firstFrameNum, lastFrameNumArg = lastFrameNum, undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication) | |
| 177 else: | |
| 178 print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers') | |
| 179 | |
| 180 def computeIndicators(self): | |
| 181 '''Computes the collision course cosine only if the cosine is positive''' | |
| 182 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) | |
| 183 collisionCourseAngles = {} | |
| 184 velocityAngles = {} | |
| 185 distances = {}#[0]*int(self.timeInterval.length()) | |
| 186 speedDifferentials = {} | |
| 187 interactionInstants = [] | |
| 188 for instant in self.timeInterval: | |
| 189 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | |
| 190 v1 = self.roadUser1.getVelocityAtInstant(instant) | |
| 191 v2 = self.roadUser2.getVelocityAtInstant(instant) | |
| 192 deltav = v2-v1 | |
| 193 velocityAngles[instant] = np.arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2())) | |
| 194 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | |
| 195 distances[instant] = deltap.norm2() | |
| 196 speedDifferentials[instant] = deltav.norm2() | |
| 197 if collisionCourseDotProducts[instant] > 0: | |
| 198 interactionInstants.append(instant) | |
| 199 if distances[instant] != 0 and speedDifferentials[instant] != 0: | |
| 200 collisionCourseAngles[instant] = np.arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) | |
| 201 | |
| 202 if len(interactionInstants) >= 2: | |
| 203 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) | |
| 204 else: | |
| 205 self.interactionInterval = moving.TimeInterval() | |
| 206 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) | |
| 207 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) | |
| 208 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) | |
| 209 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) | |
| 210 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) | |
| 211 | |
| 212 # if we have features, compute other indicators | |
| 213 if self.roadUser1.hasFeatures() and self.roadUser2.hasFeatures(): | |
| 214 minDistances={} | |
| 215 for instant in self.timeInterval: | |
| 216 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) | |
| 217 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) | |
| 218 | |
| 219 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
| 220 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
| 221 TTCs = {} | |
| 222 collisionProbabilities = {} | |
| 223 if timeInterval is not None: | |
| 224 commonTimeInterval = timeInterval | |
| 225 else: | |
| 226 commonTimeInterval = self.timeInterval | |
| 227 self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval) | |
| 228 for i, cps in self.collisionPoints.items(): | |
| 229 TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps) | |
| 230 collisionProbabilities[i] = sum([p.probability for p in cps]) | |
| 231 if len(TTCs) > 0: | |
| 232 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False)) | |
| 233 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[6], collisionProbabilities)) | |
| 234 | |
| 235 # crossing zones and pPET | |
| 236 if computeCZ: | |
| 237 self.crossingZones = crossingZones | |
| 238 pPETs = {} | |
| 239 for i, cz in self.crossingZones.items(): | |
| 240 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz) | |
| 241 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False)) | |
| 242 # TODO add probability of collision, and probability of successful evasive action | |
| 243 | |
| 244 def computePET(self, collisionDistanceThreshold): | |
| 245 pet, t1, t2= moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold) | |
| 246 if pet is not None: | |
| 247 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False)) | |
| 248 | |
| 249 def setCollision(self, collision): | |
| 250 '''indicates if it is a collision: argument should be boolean''' | |
| 251 self.collision = collision | |
| 252 | |
| 253 def isCollision(self): | |
| 254 if hasattr(self, 'collision'): | |
| 255 return self.collision | |
| 256 else: | |
| 257 return None | |
| 258 | |
| 259 def getCollisionPoints(self): | |
| 260 return self.collisionPoints | |
| 261 | |
| 262 def getCrossingZones(self): | |
| 263 return self.crossingZones | |
| 264 | |
| 265 def createInteractions(objects, _others = None): | |
| 266 '''Create all interactions of two co-existing road users''' | |
| 267 if _others is not None: | |
| 268 others = _others | |
| 269 | |
| 270 interactions = [] | |
| 271 num = 0 | |
| 272 for i in range(len(objects)): | |
| 273 if _others is None: | |
| 274 others = objects[:i] | |
| 275 for j in range(len(others)): | |
| 276 commonTimeInterval = objects[i].commonTimeInterval(others[j]) | |
| 277 if not commonTimeInterval.empty(): | |
| 278 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j])) | |
| 279 num += 1 | |
| 280 return interactions | |
| 281 | |
| 282 def findInteraction(interactions, roadUserNum1, roadUserNum2): | |
| 283 'Returns the right interaction in the set' | |
| 284 i=0 | |
| 285 while i<len(interactions) and set([roadUserNum1, roadUserNum2]) != interactions[i].getRoadUserNumbers(): | |
| 286 i+=1 | |
| 287 if i<len(interactions): | |
| 288 return interactions[i] | |
| 289 else: | |
| 290 return None | |
| 291 | |
| 292 def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
| 293 for inter in interactions: | |
| 294 print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum())) | |
| 295 inter.computeIndicators() | |
| 296 if computeMotionPrediction: | |
| 297 inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval) | |
| 298 if computePET: | |
| 299 inter.computePET(collisionDistanceThreshold) | |
| 300 return interactions | |
| 301 | |
| 302 def aggregateSafetyPoints(interactions, pointType = 'collision'): | |
| 303 '''Put all collision points or crossing zones in a list for display''' | |
| 304 allPoints = [] | |
| 305 if pointType == 'collision': | |
| 306 for i in interactions: | |
| 307 for points in i.collisionPoints.values(): | |
| 308 allPoints += points | |
| 309 elif pointType == 'crossing': | |
| 310 for i in interactions: | |
| 311 for points in i.crossingZones.values(): | |
| 312 allPoints += points | |
| 313 else: | |
| 314 print('unknown type of point: '+pointType) | |
| 315 return allPoints | |
| 316 | |
| 317 def prototypeCluster(interactions, similarities, indicatorName, minSimilarity, similarityFunc = None, minClusterSize = None, randomInitialization = False): | |
| 318 return ml.prototypeCluster([inter.getIndicator(indicatorName) for inter in interactions], similarities, minSimilarity, similarityFunc, minClusterSize, randomInitialization) | |
| 319 | |
| 320 class Crossing(moving.STObject): | |
| 321 '''Class for the event of a street crossing | |
| 322 | |
| 323 TODO: detecter passage sur la chaussee | |
| 324 identifier origines et destination (ou uniquement chaussee dans FOV) | |
| 325 carac traversee | |
| 326 detecter proximite veh (retirer si trop similaire simultanement | |
| 327 carac interaction''' | |
| 328 | |
| 329 def __init__(self, roaduserNum = None, num = None, timeInterval = None): | |
| 330 moving.STObject.__init__(self, num, timeInterval) | |
| 331 self.roaduserNum = roaduserNum | |
| 332 | |
| 333 | |
| 334 | |
| 335 if __name__ == "__main__": | |
| 336 import doctest | |
| 337 import unittest | |
| 338 suite = doctest.DocFileSuite('tests/events.txt') | |
| 339 #suite = doctest.DocTestSuite() | |
| 340 unittest.TextTestRunner().run(suite) | |
| 341 |
