Mercurial > hg > nsaunier > traffic-intelligence
comparison python/prediction.py @ 359:619ae9a9a788
implemented prediction method at constant velocity with direct intersection computation
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Thu, 11 Jul 2013 02:17:12 -0400 |
| parents | c41ff9f3c263 |
| children | 91679eb2ff2c |
comparison
equal
deleted
inserted
replaced
| 358:c41ff9f3c263 | 359:619ae9a9a788 |
|---|---|
| 61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
| 62 | 62 |
| 63 def getControl(self): | 63 def getControl(self): |
| 64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | 64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) |
| 65 | 65 |
| 66 class SafetyPoint(moving.Point): | |
| 67 '''Can represent a collision point or crossing zone | |
| 68 with respective safety indicator, TTC or pPET''' | |
| 69 def __init__(self, p, probability = 1., indicator = -1): | |
| 70 self.x = p.x | |
| 71 self.y = p.y | |
| 72 self.probability = probability | |
| 73 self.indicator = indicator | |
| 74 | |
| 75 def __str__(self): | |
| 76 return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) | |
| 77 | |
| 78 @staticmethod | |
| 79 def save(out, points, predictionInstant, objNum1, objNum2): | |
| 80 for p in points: | |
| 81 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | |
| 82 | |
| 83 @staticmethod | |
| 84 def computeExpectedIndicator(points): | |
| 85 from numpy import sum | |
| 86 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | |
| 87 | |
| 66 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | 88 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): |
| 67 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' | 89 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' |
| 68 t = 1 | 90 t = 1 |
| 69 p1 = predictedTrajectory1.predictPosition(t) | 91 p1 = predictedTrajectory1.predictPosition(t) |
| 70 p2 = predictedTrajectory2.predictPosition(t) | 92 p2 = predictedTrajectory2.predictPosition(t) |
| 84 | 106 |
| 85 def generatePredictedTrajectories(self, obj, instant): | 107 def generatePredictedTrajectories(self, obj, instant): |
| 86 return [] | 108 return [] |
| 87 | 109 |
| 88 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 110 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): |
| 89 '''returns the lists of collision points and crossing zones | 111 '''returns the lists of collision points and crossing zones''' |
| 90 | |
| 91 Check: Predicting all the points together, as if they represent the whole vehicle''' | |
| 92 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | 112 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) |
| 93 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | 113 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) |
| 94 | 114 |
| 95 collisionPoints = [] | 115 collisionPoints = [] |
| 96 crossingZones = [] | 116 crossingZones = [] |
| 286 moving.NormAngle(self.accelerationDistribution(), | 306 moving.NormAngle(self.accelerationDistribution(), |
| 287 self.steeringDistribution()), | 307 self.steeringDistribution()), |
| 288 maxSpeed = self.maxSpeed)) | 308 maxSpeed = self.maxSpeed)) |
| 289 return predictedTrajectories | 309 return predictedTrajectories |
| 290 | 310 |
| 291 class SafetyPoint(moving.Point): | 311 |
| 292 '''Can represent a collision point or crossing zone | 312 class CVDirectPredictionParameters(PredictionParameters): |
| 293 with respective safety indicator, TTC or pPET''' | 313 '''Prediction parameters of prediction at constant velocity |
| 294 def __init__(self, p, probability = 1., indicator = -1): | 314 using direct computation of the intersecting point''' |
| 295 self.x = p.x | 315 |
| 296 self.y = p.y | 316 def __init__(self, maxSpeed): |
| 297 self.probability = probability | 317 PredictionParameters.__init__(self, 'constant velocity (direct computation)', maxSpeed) |
| 298 self.indicator = indicator | 318 |
| 299 | 319 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): |
| 300 def __str__(self): | 320 collisionPoints = [] |
| 301 return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) | 321 crossingZones = [] |
| 302 | 322 |
| 303 @staticmethod | 323 p1 = obj1.getPositionAtInstant(currentInstant) |
| 304 def save(out, points, predictionInstant, objNum1, objNum2): | 324 p2 = obj2.getPositionAtInstant(currentInstant) |
| 305 for p in points: | 325 if (p1-p2).norm2() <= collisionDistanceThreshold: |
| 306 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | 326 collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)] |
| 307 | 327 else: |
| 308 @staticmethod | 328 v1 = obj1.getVelocityAtInstant(currentInstant) |
| 309 def computeExpectedIndicator(points): | 329 v2 = obj2.getVelocityAtInstant(currentInstant) |
| 310 from numpy import sum | 330 intersection = moving.intersection(p1, p2, v1, v2) |
| 311 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 331 |
| 332 if intersection != None: | |
| 333 dp1 = intersection-p1 | |
| 334 dp2 = intersection-p2 | |
| 335 if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection | |
| 336 dist1 = dp1.norm2() | |
| 337 dist2 = dp2.norm2() | |
| 338 s1 = v1.norm2() | |
| 339 s2 = v2.norm2() | |
| 340 halfCollisionDistanceThreshold = collisionDistanceThreshold/2. | |
| 341 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) | |
| 342 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) | |
| 343 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) | |
| 344 if not collisionTimeInterval.empty(): | |
| 345 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] | |
| 346 else: | |
| 347 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] | |
| 348 | |
| 349 if debug and intersection!= None: | |
| 350 from matplotlib.pyplot import plot, figure, axis, title | |
| 351 figure() | |
| 352 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') | |
| 353 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') | |
| 354 intersection.draw() | |
| 355 obj1.draw('r') | |
| 356 obj2.draw('b') | |
| 357 title('instant {0}'.format(currentInstant)) | |
| 358 axis('equal') | |
| 359 | |
| 360 return collisionPoints, crossingZones | |
| 361 | |
| 362 | |
| 312 | 363 |
| 313 | 364 |
| 314 #### | 365 #### |
| 315 # Other Methods | 366 # Other Methods |
| 316 #### | 367 #### |
