Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/events.py @ 1266:ebb18043616e
work in progress on categorization
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Tue, 28 May 2024 17:16:41 -0400 |
| parents | 39740c4668ac |
| children | ad60e5adf084 |
comparison
equal
deleted
inserted
replaced
| 1265:0f5bebd62a55 | 1266:ebb18043616e |
|---|---|
| 51 ''' | 51 ''' |
| 52 | 52 |
| 53 categories = {'headon': 0, | 53 categories = {'headon': 0, |
| 54 'rearend': 1, | 54 'rearend': 1, |
| 55 'side': 2, | 55 'side': 2, |
| 56 'parallel': 3} | 56 'parallel': 3, |
| 57 'stationary': 4} | |
| 57 | 58 |
| 58 indicatorNames = ['Collision Course Dot Product', | 59 indicatorNames = ['Collision Course Dot Product', |
| 59 'Collision Course Angle', | 60 'Collision Course Angle', |
| 60 'Distance', | 61 'Distance', |
| 61 'Minimum Distance', | 62 'Minimum Distance', |
| 106 elif roadUser1 is not None and roadUser2 is not None: | 107 elif roadUser1 is not None and roadUser2 is not None: |
| 107 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) | 108 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) |
| 108 else: | 109 else: |
| 109 self.roadUserNumbers = None | 110 self.roadUserNumbers = None |
| 110 self.indicators = {} | 111 self.indicators = {} |
| 111 self.interactionInterval = None | |
| 112 # list for collison points and crossing zones | 112 # list for collison points and crossing zones |
| 113 self.collisionPoints = None | 113 self.collisionPoints = None |
| 114 self.crossingZones = None | 114 self.crossingZones = None |
| 115 | 115 |
| 116 def getRoadUserNumbers(self): | 116 def getRoadUserNumbers(self): |
| 200 collisionCourseDotProducts = {} | 200 collisionCourseDotProducts = {} |
| 201 collisionCourseAngles = {} | 201 collisionCourseAngles = {} |
| 202 velocityAngles = {} | 202 velocityAngles = {} |
| 203 distances = {} | 203 distances = {} |
| 204 speedDifferentials = {} | 204 speedDifferentials = {} |
| 205 interactionInstants = [] | |
| 206 for instant in self.timeInterval: | 205 for instant in self.timeInterval: |
| 207 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | 206 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) |
| 208 v1 = self.roadUser1.getVelocityAtInstant(instant) | 207 v1 = self.roadUser1.getVelocityAtInstant(instant) |
| 209 v2 = self.roadUser2.getVelocityAtInstant(instant) | 208 v2 = self.roadUser2.getVelocityAtInstant(instant) |
| 210 deltav = v2-v1 | 209 deltav = v2-v1 |
| 213 if v1Norm != 0. and v2Norm != 0.: | 212 if v1Norm != 0. and v2Norm != 0.: |
| 214 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm)))) | 213 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm)))) |
| 215 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | 214 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) |
| 216 distances[instant] = deltap.norm2() | 215 distances[instant] = deltap.norm2() |
| 217 speedDifferentials[instant] = deltav.norm2() | 216 speedDifferentials[instant] = deltav.norm2() |
| 218 if collisionCourseDotProducts[instant] > 0: | |
| 219 interactionInstants.append(instant) | |
| 220 if distances[instant] != 0 and speedDifferentials[instant] != 0: | 217 if distances[instant] != 0 and speedDifferentials[instant] != 0: |
| 221 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 | 218 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 |
| 222 | 219 |
| 223 if len(interactionInstants) >= 2: | |
| 224 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) | |
| 225 else: | |
| 226 self.interactionInterval = moving.TimeInterval() | |
| 227 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) | 220 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) |
| 228 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) | 221 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) |
| 229 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) | 222 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) |
| 230 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) | 223 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) |
| 231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) | 224 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) |
| 235 minDistances={} | 228 minDistances={} |
| 236 for instant in self.timeInterval: | 229 for instant in self.timeInterval: |
| 237 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) | 230 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) |
| 238 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) | 231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) |
| 239 | 232 |
| 240 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None): | 233 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None, speedThreshold = 0.): |
| 241 '''Computes the interaction category by instant | 234 '''Computes the interaction category by instant |
| 242 all 3 angle arguments in radian | 235 all 3 angle arguments in radian |
| 243 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) | 236 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) |
| 244 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector | 237 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector |
| 245 | 238 |
| 246 an instant may not be categorized if it matches the side definition (angle) | 239 an instant may not be categorized if it matches the side definition (angle) |
| 247 but the distance is growing (at least one user is probably past the point of trajectory crossing)''' | 240 but the distance is growing (at least one user is probably past the point of trajectory crossing)''' |
| 248 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) | 241 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) |
| 249 if headonCollisionCourseAngleTolerance is None: | 242 if headonCollisionCourseAngleTolerance is None: |
| 250 headonCollisionCourseAngleTolerance = velocityAngleTolerance | 243 headonCollisionCourseAngleTolerance = velocityAngleTolerance |
| 244 speedThreshold2 = speedThreshold**2 | |
| 251 | 245 |
| 252 self.categories = {} | 246 self.categories = {} |
| 253 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) | 247 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) |
| 254 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1]) | 248 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1]) |
| 255 distances = self.getIndicator(Interaction.indicatorNames[2]) | 249 distances = self.getIndicator(Interaction.indicatorNames[2]) |
| 256 velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) | 250 velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) |
| 257 for instant in self.timeInterval: | 251 for instant in self.timeInterval: |
| 258 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end | 252 if instant in velocityAngles: |
| 259 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) | 253 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end |
| 260 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | 254 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) |
| 261 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: | 255 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) |
| 262 self.categories[instant] = Interaction.categories["parallel"] | 256 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: |
| 263 else: | 257 self.categories[instant] = Interaction.categories["parallel"] |
| 264 self.categories[instant] = Interaction.categories["rearend"] | 258 else: |
| 265 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on | 259 self.categories[instant] = Interaction.categories["rearend"] |
| 266 self.categories[instant] = Interaction.categories["headon"] | 260 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on |
| 267 elif collisionCourseDotProducts[instant] > 0: | 261 self.categories[instant] = Interaction.categories["headon"] |
| 268 self.categories[instant] = Interaction.categories["side"] | 262 elif collisionCourseDotProducts[instant] > 0: |
| 263 self.categories[instant] = Interaction.categories["side"] | |
| 264 # true stationary is when object does not move for the whole period of the interaction, otherwise get last (or next) velocity vector for user orientation | |
| 265 # if instant not in self.categories: # if it's none of the other categories (could be with almost stationary vehicle) and only one speed is 0 | |
| 266 # stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 | |
| 267 # stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 | |
| 268 # if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving | |
| 269 # self.categories[instant] = Interaction.categories["stationary"] | |
| 270 # leaving is not a good interaction category (issue in Etienne's 2022 paper): means we are past the situation in which users are approaching | |
| 271 # could try to predict what happened before, but it's not observed | |
| 272 | |
| 269 | 273 |
| 270 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | 274 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): |
| 271 '''Computes all crossing and collision points at each common instant for two road users. ''' | 275 '''Computes all crossing and collision points at each common instant for two road users. ''' |
| 272 TTCs = {} | 276 TTCs = {} |
| 273 collisionProbabilities = {} | 277 collisionProbabilities = {} |
