Mercurial > hg > nsaunier > traffic-intelligence
comparison python/prediction.py @ 271:bbd9c09e6869
changed the names to prediction methods and predicted trajectories
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Thu, 09 Aug 2012 15:18:20 -0400 |
| parents | python/extrapolation.py@05c9b0cb8202 |
| children | e56c34c1ebac |
comparison
equal
deleted
inserted
replaced
| 270:05c9b0cb8202 | 271:bbd9c09e6869 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 '''Library for motion prediction methods''' | |
| 3 | |
| 4 import moving | |
| 5 import math | |
| 6 import random | |
| 7 | |
| 8 class PredictedTrajectory: | |
| 9 '''Class for predicted trajectories with lazy evaluation | |
| 10 if the predicted position has not been already computed, compute it | |
| 11 | |
| 12 it should also have a probability''' | |
| 13 | |
| 14 def __init__(self): | |
| 15 self.probability = 0. | |
| 16 self.predictedPositions = {} | |
| 17 self.predictedSpeedOrientations = {} | |
| 18 self.collisionPoints = {} | |
| 19 self.crossingZones = {} | |
| 20 | |
| 21 def predictPosition(self, nTimeSteps): | |
| 22 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | |
| 23 self.predictPosition(nTimeSteps-1) | |
| 24 self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) | |
| 25 return self.predictedPositions[nTimeSteps] | |
| 26 | |
| 27 def getPredictedTrajectory(self): | |
| 28 return moving.Trajectory.fromPointList(self.predictedPositions.values()) | |
| 29 | |
| 30 def getPredictedSpeeds(self): | |
| 31 return [so.norm for so in self.predictedSpeedOrientations.values()] | |
| 32 | |
| 33 def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): | |
| 34 self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) | |
| 35 | |
| 36 class PredictedTrajectoryConstant(PredictedTrajectory): | |
| 37 '''Predicted trajectory at constant speed or acceleration | |
| 38 TODO generalize by passing a series of velocities/accelerations''' | |
| 39 | |
| 40 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): | |
| 41 self.control = control | |
| 42 self.maxSpeed = maxSpeed | |
| 43 self.probability = probability | |
| 44 self.predictedPositions = {0: initialPosition} | |
| 45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 46 | |
| 47 def getControl(self): | |
| 48 return self.control | |
| 49 | |
| 50 class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): | |
| 51 '''Random small adaptation of vehicle control ''' | |
| 52 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): | |
| 53 '''Constructor | |
| 54 accelerationDistribution and steeringDistribution are distributions | |
| 55 that return random numbers drawn from them''' | |
| 56 self.accelerationDistribution = accelerationDistribution | |
| 57 self.steeringDistribution = steeringDistribution | |
| 58 self.maxSpeed = maxSpeed | |
| 59 self.probability = probability | |
| 60 self.predictedPositions = {0: initialPosition} | |
| 61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 62 | |
| 63 def getControl(self): | |
| 64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | |
| 65 | |
| 66 class PredictionParameters: | |
| 67 def __init__(self, name, maxSpeed): | |
| 68 self.name = name | |
| 69 self.maxSpeed = maxSpeed | |
| 70 | |
| 71 def __str__(self): | |
| 72 return '{0} {1}'.format(self.name, self.maxSpeed) | |
| 73 | |
| 74 class ConstantPredictionParameters(PredictionParameters): | |
| 75 def __init__(self, maxSpeed): | |
| 76 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | |
| 77 | |
| 78 def generatePredictedTrajectories(self, obj, instant): | |
| 79 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
| 80 maxSpeed = self.maxSpeed)] | |
| 81 | |
| 82 class NormalAdaptationPredictionParameters(PredictionParameters): | |
| 83 def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering): | |
| 84 PredictionParameters.__init__(self, 'normal adaptation', maxSpeed) | |
| 85 self.nPredictedTrajectories = nPredictedTrajectories | |
| 86 self.maxAcceleration = maxAcceleration | |
| 87 self.maxSteering = maxSteering | |
| 88 self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, | |
| 89 self.maxAcceleration, 0.) | |
| 90 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 91 self.maxSteering, 0.) | |
| 92 | |
| 93 def __str__(self): | |
| 94 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, | |
| 95 self.maxAcceleration, | |
| 96 self.maxSteering) | |
| 97 | |
| 98 def generatePredictedTrajectories(self, obj, instant): | |
| 99 predictedTrajectories = [] | |
| 100 for i in xrange(self.nPredictedTrajectories): | |
| 101 predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | |
| 102 obj.getVelocityAtInstant(instant), | |
| 103 self.accelerationDistribution, | |
| 104 self.steeringDistribution, | |
| 105 maxSpeed = self.maxSpeed)) | |
| 106 return predictedTrajectories | |
| 107 | |
| 108 class PointSetPredictionParameters(PredictionParameters): | |
| 109 # todo generate several trajectories with normal adaptatoins from each position (feature) | |
| 110 def __init__(self, maxSpeed): | |
| 111 PredictionParameters.__init__(self, 'point set', maxSpeed) | |
| 112 | |
| 113 def generatePredictedTrajectories(self, obj, instant): | |
| 114 predictedTrajectories = [] | |
| 115 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 116 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 117 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 118 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 119 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, | |
| 120 maxSpeed = self.maxSpeed)) | |
| 121 return predictedTrajectories | |
| 122 | |
| 123 class EvasiveActionPredictionParameters(PredictionParameters): | |
| 124 def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): | |
| 125 if useFeatures: | |
| 126 name = 'point set evasive action' | |
| 127 else: | |
| 128 name = 'evasive action' | |
| 129 PredictionParameters.__init__(self, name, maxSpeed) | |
| 130 self.nPredictedTrajectories = nPredictedTrajectories | |
| 131 self.minAcceleration = minAcceleration | |
| 132 self.maxAcceleration = maxAcceleration | |
| 133 self.maxSteering = maxSteering | |
| 134 self.useFeatures = useFeatures | |
| 135 self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, | |
| 136 self.maxAcceleration, 0.) | |
| 137 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 138 self.maxSteering, 0.) | |
| 139 | |
| 140 def __str__(self): | |
| 141 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
| 142 | |
| 143 def generatePredictedTrajectories(self, obj, instant): | |
| 144 predictedTrajectories = [] | |
| 145 if self.useFeatures: | |
| 146 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 147 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 148 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 149 else: | |
| 150 positions = [obj.getPositionAtInstant(instant)] | |
| 151 velocities = [obj.getVelocityAtInstant(instant)] | |
| 152 for i in xrange(self.nPredictedTrajectories): | |
| 153 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 154 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, | |
| 155 initialVelocity, | |
| 156 moving.NormAngle(self.accelerationDistribution(), | |
| 157 self.steeringDistribution()), | |
| 158 maxSpeed = self.maxSpeed)) | |
| 159 return predictedTrajectories | |
| 160 | |
| 161 class SafetyPoint(moving.Point): | |
| 162 '''Can represent a collision point or crossing zone | |
| 163 with respective safety indicator, TTC or pPET''' | |
| 164 def __init__(self, p, probability = 1., indicator = -1): | |
| 165 self.x = p.x | |
| 166 self.y = p.y | |
| 167 self.probability = probability | |
| 168 self.indicator = indicator | |
| 169 | |
| 170 @staticmethod | |
| 171 def save(out, points, objNum1, objNum2, instant): | |
| 172 for p in points: | |
| 173 out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator)) | |
| 174 | |
| 175 def computeExpectedIndicator(points): | |
| 176 from numpy import sum | |
| 177 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | |
| 178 | |
| 179 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
| 180 t = 1 | |
| 181 p1 = predictedTrajectory1.predictPosition(t) | |
| 182 p2 = predictedTrajectory2.predictPosition(t) | |
| 183 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
| 184 p1 = predictedTrajectory1.predictPosition(t) | |
| 185 p2 = predictedTrajectory2.predictPosition(t) | |
| 186 t += 1 | |
| 187 return t, p1, p2 | |
| 188 | |
| 189 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False): | |
| 190 '''returns the lists of collision points and crossing zones | |
| 191 | |
| 192 Check: Predicting all the points together, as if they represent the whole vehicle''' | |
| 193 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) | |
| 194 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) | |
| 195 | |
| 196 collisionPoints = [] | |
| 197 crossingZones = [] | |
| 198 for et1 in predictedTrajectories1: | |
| 199 for et2 in predictedTrajectories2: | |
| 200 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 201 | |
| 202 if t <= timeHorizon: | |
| 203 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | |
| 204 else: # check if there is a crossing zone | |
| 205 # TODO? zone should be around the points at which the traj are the closest | |
| 206 # look for CZ at different times, otherwise it would be a collision | |
| 207 # an approximation would be to look for close points at different times, ie the complementary of collision points | |
| 208 cz = None | |
| 209 t1 = 0 | |
| 210 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 | |
| 211 t2 = 0 | |
| 212 while not cz and t2 < timeHorizon: | |
| 213 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | |
| 214 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | |
| 215 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | |
| 216 if cz: | |
| 217 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | |
| 218 t2 += 1 | |
| 219 t1 += 1 | |
| 220 | |
| 221 if debug: | |
| 222 from matplotlib.pyplot import figure, axis, title | |
| 223 figure() | |
| 224 for et in predictedTrajectories1: | |
| 225 et.predictPosition(timeHorizon) | |
| 226 et.draw('rx') | |
| 227 | |
| 228 for et in predictedTrajectories2: | |
| 229 et.predictPosition(timeHorizon) | |
| 230 et.draw('bx') | |
| 231 obj1.draw('r') | |
| 232 obj2.draw('b') | |
| 233 title('instant {0}'.format(i)) | |
| 234 axis('equal') | |
| 235 | |
| 236 return collisionPoints, crossingZones | |
| 237 | |
| 238 def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): | |
| 239 collisionPoints={} | |
| 240 crossingZones={} | |
| 241 if timeInterval: | |
| 242 commonTimeInterval = timeInterval | |
| 243 else: | |
| 244 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 245 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
| 246 print(obj1.num, obj2.num, i) | |
| 247 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug) | |
| 248 SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) | |
| 249 SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) | |
| 250 | |
| 251 return collisionPoints, crossingZones | |
| 252 | |
| 253 def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): | |
| 254 collisionProbabilities = {} | |
| 255 if timeInterval: | |
| 256 commonTimeInterval = timeInterval | |
| 257 else: | |
| 258 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 259 for i in list(commonTimeInterval)[:-1]: | |
| 260 nCollisions = 0 | |
| 261 print(obj1.num, obj2.num, i) | |
| 262 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) | |
| 263 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) | |
| 264 for et1 in predictedTrajectories1: | |
| 265 for et2 in predictedTrajectories2: | |
| 266 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 267 if t <= timeHorizon: | |
| 268 nCollisions += 1 | |
| 269 # take into account probabilities ?? | |
| 270 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
| 271 collisionProbabilities[i] = float(nCollisions)/nSamples | |
| 272 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) | |
| 273 | |
| 274 if debug: | |
| 275 from matplotlib.pyplot import figure, axis, title | |
| 276 figure() | |
| 277 for et in predictedTrajectories1: | |
| 278 et.predictPosition(timeHorizon) | |
| 279 et.draw('rx') | |
| 280 | |
| 281 for et in predictedTrajectories2: | |
| 282 et.predictPosition(timeHorizon) | |
| 283 et.draw('bx') | |
| 284 obj1.draw('r') | |
| 285 obj2.draw('b') | |
| 286 title('instant {0}'.format(i)) | |
| 287 axis('equal') | |
| 288 | |
| 289 return collisionProbabilities | |
| 290 | |
| 291 | |
| 292 if __name__ == "__main__": | |
| 293 import doctest | |
| 294 import unittest | |
| 295 suite = doctest.DocFileSuite('tests/prediction.txt') | |
| 296 #suite = doctest.DocTestSuite() | |
| 297 unittest.TextTestRunner().run(suite) | |
| 298 #doctest.testmod() | |
| 299 #doctest.testfile("example.txt") | |
| 300 |
