Mercurial > hg > nsaunier > traffic-intelligence
comparison python/events.py @ 607:84690dfe5560
add some functions for behaviour analysis
| author | MohamedGomaa |
|---|---|
| date | Tue, 25 Nov 2014 22:49:47 -0500 |
| parents | 07b1bd0785cd |
| children | 0dc36203973d |
comparison
equal
deleted
inserted
replaced
| 606:75ad9c0d6cc3 | 607:84690dfe5560 |
|---|---|
| 6 from numpy import arccos | 6 from numpy import arccos |
| 7 | 7 |
| 8 import multiprocessing | 8 import multiprocessing |
| 9 import itertools | 9 import itertools |
| 10 | 10 |
| 11 import sys | |
| 11 import moving, prediction, indicators, utils | 12 import moving, prediction, indicators, utils |
| 12 | 13 sys.path.append("D:/behaviourAnalysis/libs") |
| 14 import trajLearning | |
| 13 __metaclass__ = type | 15 __metaclass__ = type |
| 14 | 16 |
| 15 class Interaction(moving.STObject): | 17 class Interaction(moving.STObject): |
| 16 '''Class for an interaction between two road users | 18 '''Class for an interaction between two road users |
| 17 or a road user and an obstacle | 19 or a road user and an obstacle |
| 145 for i in list(commonTimeInterval)[:-1]: | 147 for i in list(commonTimeInterval)[:-1]: |
| 146 if len(self.crossingZones[i]) > 0: | 148 if len(self.crossingZones[i]) > 0: |
| 147 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) | 149 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) |
| 148 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) | 150 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) |
| 149 | 151 |
| 152 def computeCrosssingCollisionsPrototypeAtInstant(self, instant,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False,useDestination=True,useSpeedPrototype=True): | |
| 153 inter1=moving.Interval(self.roadUser1.timeInterval.first,instant) | |
| 154 inter2=moving.Interval(self.roadUser2.timeInterval.first,instant) | |
| 155 partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions | |
| 156 partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions | |
| 157 if useSpeedPrototype: | |
| 158 prototypeTrajectories1=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
| 159 prototypeTrajectories2=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
| 160 else: | |
| 161 prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
| 162 prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
| 163 if prototypeTrajectories1=={}: | |
| 164 print self.roadUser1.num, 'is abnormal at instant', str(instant) | |
| 165 return [],[] | |
| 166 elif prototypeTrajectories2=={}: | |
| 167 print self.roadUser2.num, 'is abnormal at instant', str(instant) | |
| 168 return [],[] | |
| 169 else: | |
| 170 currentInstant,collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisionsAtInstant(instant, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) | |
| 171 return collisionPoints,crossingZones | |
| 172 | |
| 173 def computeCrossingsCollisionsPrototype2stages(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,step=10,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30,useDestination=True,useSpeedPrototype=True): | |
| 174 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
| 175 self.collisionPoints={} | |
| 176 self.crossingZones={} | |
| 177 TTCs = {} | |
| 178 route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID) | |
| 179 route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID) | |
| 180 if useDestination: | |
| 181 if route1 not in prototypes.keys(): | |
| 182 route1= trajLearning.findRoute(prototypes,objects,route1,self.roadUser1.num,noiseEntryNums,noiseExitNums) | |
| 183 if route2 not in prototypes.keys(): | |
| 184 route2= trajLearning.findRoute(prototypes,objects,route2,self.roadUser2.num,noiseEntryNums,noiseExitNums) | |
| 185 | |
| 186 if timeInterval: | |
| 187 commonTimeInterval = timeInterval | |
| 188 else: | |
| 189 commonTimeInterval = self.timeInterval | |
| 190 reCompute=False | |
| 191 for i in xrange(commonTimeInterval.first,commonTimeInterval.last,step): # incremental calculation of CP,CZ to save time | |
| 192 if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: | |
| 193 self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) | |
| 194 if len(self.collisionPoints[i]) >0 or len(self.crossingZones[i])>0: | |
| 195 reCompute=True | |
| 196 break | |
| 197 if reCompute: | |
| 198 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
| 199 if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: | |
| 200 self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) | |
| 201 if len(self.collisionPoints[i]) > 0: | |
| 202 TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) | |
| 203 # add probability of collision, and probability of successful evasive action | |
| 204 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) | |
| 205 | |
| 206 if computeCZ: | |
| 207 pPETs = {} | |
| 208 for i in list(commonTimeInterval)[:-1]: | |
| 209 if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0: | |
| 210 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) | |
| 211 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) | |
| 212 | |
| 150 def addVideoFilename(self,videoFilename): | 213 def addVideoFilename(self,videoFilename): |
| 151 self.videoFilename= videoFilename | 214 self.videoFilename= videoFilename |
| 152 | 215 |
| 153 def addInteractionType(self,interactionType): | 216 def addInteractionType(self,interactionType): |
| 154 ''' interaction types: conflict or collision if they are known''' | 217 ''' interaction types: conflict or collision if they are known''' |
