Mercurial > hg > nsaunier > traffic-intelligence
comparison python/prediction.py @ 604:8ba4b8ad4c86
Motion Pattern Method
| author | MohamedGomaa |
|---|---|
| date | Tue, 15 Jul 2014 13:25:15 -0400 |
| parents | 727e3c529519 |
| children |
comparison
equal
deleted
inserted
replaced
| 548:e6ab4caf359c | 604:8ba4b8ad4c86 |
|---|---|
| 2 '''Library for motion prediction methods''' | 2 '''Library for motion prediction methods''' |
| 3 | 3 |
| 4 import moving | 4 import moving |
| 5 import math | 5 import math |
| 6 import random | 6 import random |
| 7 import numpy as np | |
| 7 | 8 |
| 8 class PredictedTrajectory: | 9 class PredictedTrajectory: |
| 9 '''Class for predicted trajectories with lazy evaluation | 10 '''Class for predicted trajectories with lazy evaluation |
| 10 if the predicted position has not been already computed, compute it | 11 if the predicted position has not been already computed, compute it |
| 11 | 12 |
| 45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 46 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
| 46 | 47 |
| 47 def getControl(self): | 48 def getControl(self): |
| 48 return self.control | 49 return self.control |
| 49 | 50 |
| 51 def findNearestOrientation(initialPosition,prototypeTrajectory): | |
| 52 distances=[] | |
| 53 for p in prototypeTrajectory.positions: | |
| 54 distances.append(moving.Point.distanceNorm2(initialPosition, p)) | |
| 55 t=np.argmin(distances) | |
| 56 return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle | |
| 57 #todo: merge with previous function | |
| 58 def findNearestInstant(initialPosition,prototypeTrajectory): | |
| 59 distances=[] | |
| 60 for p in prototypeTrajectory.positions: | |
| 61 distances.append(moving.Point.distanceNorm2(initialPosition, p)) | |
| 62 t=np.argmin(distances) | |
| 63 return t | |
| 64 | |
| 50 class PredictedTrajectoryPrototype(PredictedTrajectory): | 65 class PredictedTrajectoryPrototype(PredictedTrajectory): |
| 51 '''Predicted trajectory that follows a prototype trajectory | 66 '''Predicted trajectory that follows a prototype trajectory |
| 52 The prototype is in the format of a moving.Trajectory: it could be | 67 The prototype is in the format of a moving.Trajectory: it could be |
| 53 1. an observed trajectory (extracted from video) | 68 1. an observed trajectory (extracted from video) |
| 54 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | 69 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow |
| 57 1. at constant speed (the instantaneous user speed) | 72 1. at constant speed (the instantaneous user speed) |
| 58 2. following the trajectory path, at the speed of the user | 73 2. following the trajectory path, at the speed of the user |
| 59 (applying a constant ratio equal | 74 (applying a constant ratio equal |
| 60 to the ratio of the user instantaneous speed and the trajectory closest speed)''' | 75 to the ratio of the user instantaneous speed and the trajectory closest speed)''' |
| 61 | 76 |
| 62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): | 77 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.): |
| 63 self.prototypeTrajectory = prototypeTrajectory | 78 self.prototypeTrajectory = prototypeTrajectory |
| 64 self.constantSpeed = constantSpeed | 79 self.constantSpeed = constantSpeed |
| 80 self.speedList = speedList | |
| 65 self.probability = probability | 81 self.probability = probability |
| 66 self.predictedPositions = {0: initialPosition} | 82 self.predictedPositions = {0: initialPosition} |
| 67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 83 self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)} |
| 68 | 84 |
| 69 def predictPosition(self, nTimeSteps): | 85 def predictPosition(self, nTimeSteps): |
| 70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | 86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): |
| 71 if constantSpeed: | 87 if self.constantSpeed: |
| 72 # calculate cumulative distance | 88 # calculate cumulative distance |
| 73 pass | 89 speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm |
| 90 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) | |
| 91 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) | |
| 92 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
| 93 #pass | |
| 94 elif self.speedList!=[]: | |
| 95 pass | |
| 74 else: # see c++ code, calculate ratio | 96 else: # see c++ code, calculate ratio |
| 75 pass | 97 speedNorm= self.predictedSpeedOrientations[0].norm |
| 98 instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory) | |
| 99 prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] | |
| 100 ratio=float(speedNorm)/prototypeSpeeds[0] | |
| 101 resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] | |
| 102 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) | |
| 103 if nTimeSteps<len(resampledSpeeds): | |
| 104 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) | |
| 105 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
| 106 else: | |
| 107 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) | |
| 108 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
| 109 # pass | |
| 76 return self.predictedPositions[nTimeSteps] | 110 return self.predictedPositions[nTimeSteps] |
| 77 | 111 |
| 78 class PredictedTrajectoryRandomControl(PredictedTrajectory): | 112 class PredictedTrajectoryRandomControl(PredictedTrajectory): |
| 79 '''Random vehicle control: suitable for normal adaptation''' | 113 '''Random vehicle control: suitable for normal adaptation''' |
| 80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | 114 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): |
| 133 return '{0} {1}'.format(self.name, self.maxSpeed) | 167 return '{0} {1}'.format(self.name, self.maxSpeed) |
| 134 | 168 |
| 135 def generatePredictedTrajectories(self, obj, instant): | 169 def generatePredictedTrajectories(self, obj, instant): |
| 136 return [] | 170 return [] |
| 137 | 171 |
| 138 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 172 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): |
| 139 '''returns the lists of collision points and crossing zones''' | 173 '''returns the lists of collision points and crossing zones''' |
| 140 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | 174 if prototypeTrajectories1==None: |
| 141 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | 175 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) |
| 176 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | |
| 177 else: | |
| 178 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) | |
| 179 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) | |
| 142 | 180 |
| 143 collisionPoints = [] | 181 collisionPoints = [] |
| 144 crossingZones = [] | 182 crossingZones = [] |
| 145 for et1 in predictedTrajectories1: | 183 for et1 in predictedTrajectories1: |
| 146 for et2 in predictedTrajectories2: | 184 for et2 in predictedTrajectories2: |
| 159 while not cz and t2 < timeHorizon: | 197 while not cz and t2 < timeHorizon: |
| 160 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | 198 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: |
| 161 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | 199 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) |
| 162 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | 200 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) |
| 163 if cz: | 201 if cz: |
| 164 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | 202 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() |
| 203 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) | |
| 165 t2 += 1 | 204 t2 += 1 |
| 166 t1 += 1 | 205 t1 += 1 |
| 167 | 206 |
| 168 if debug: | 207 if debug: |
| 169 from matplotlib.pyplot import figure, axis, title | 208 from matplotlib.pyplot import figure, axis, title |
| 180 title('instant {0}'.format(currentInstant)) | 219 title('instant {0}'.format(currentInstant)) |
| 181 axis('equal') | 220 axis('equal') |
| 182 | 221 |
| 183 return collisionPoints, crossingZones | 222 return collisionPoints, crossingZones |
| 184 | 223 |
| 185 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | 224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None): |
| 186 '''Computes all crossing and collision points at each common instant for two road users. ''' | 225 '''Computes all crossing and collision points at each common instant for two road users. ''' |
| 187 collisionPoints={} | 226 collisionPoints={} |
| 188 crossingZones={} | 227 crossingZones={} |
| 189 if timeInterval: | 228 if timeInterval: |
| 190 commonTimeInterval = timeInterval | 229 commonTimeInterval = timeInterval |
| 191 else: | 230 else: |
| 192 commonTimeInterval = obj1.commonTimeInterval(obj2) | 231 commonTimeInterval = obj1.commonTimeInterval(obj2) |
| 193 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 232 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
| 194 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | 233 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) |
| 195 | 234 |
| 196 return collisionPoints, crossingZones | 235 return collisionPoints, crossingZones |
| 197 | 236 |
| 198 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | 237 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): |
| 199 '''Computes only collision probabilities | 238 '''Computes only collision probabilities |
| 410 return [],[] | 449 return [],[] |
| 411 | 450 |
| 412 #### | 451 #### |
| 413 # Other Methods | 452 # Other Methods |
| 414 #### | 453 #### |
| 415 | 454 class prototypePredictionParameters(PredictionParameters): |
| 416 | 455 def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]): |
| 417 | 456 #TODO speed profiles integration |
| 418 | 457 name = 'prototype' |
| 458 PredictionParameters.__init__(self, name, maxSpeed) | |
| 459 self.nPredictedTrajectories = nPredictedTrajectories | |
| 460 #self.prototypeTrajectory = prototypeTrajectory | |
| 461 self.constantSpeed = constantSpeed | |
| 462 self.speedList = speedList | |
| 463 #self.probability=probability | |
| 464 | |
| 465 def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): | |
| 466 predictedTrajectories = [] | |
| 467 initialPosition = obj.getPositionAtInstant(instant) | |
| 468 initialVelocity = obj.getVelocityAtInstant(instant) | |
| 469 for prototypeTraj in prototypeTrajectories.keys(): | |
| 470 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) | |
| 471 return predictedTrajectories | |
| 419 | 472 |
| 420 if __name__ == "__main__": | 473 if __name__ == "__main__": |
| 421 import doctest | 474 import doctest |
| 422 import unittest | 475 import unittest |
| 423 suite = doctest.DocFileSuite('tests/prediction.txt') | 476 suite = doctest.DocFileSuite('tests/prediction.txt') |
