Mercurial > hg > nsaunier > traffic-intelligence
comparison python/prediction.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
| author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
|---|---|
| date | Fri, 05 Dec 2014 12:13:53 -0500 |
| parents | a9c1d61a89b4 |
| children | 84690dfe5560 |
comparison
equal
deleted
inserted
replaced
| 598:11f96bd08552 | 614:5e09583275a4 |
|---|---|
| 28 return moving.Trajectory.fromPointList(self.predictedPositions.values()) | 28 return moving.Trajectory.fromPointList(self.predictedPositions.values()) |
| 29 | 29 |
| 30 def getPredictedSpeeds(self): | 30 def getPredictedSpeeds(self): |
| 31 return [so.norm for so in self.predictedSpeedOrientations.values()] | 31 return [so.norm for so in self.predictedSpeedOrientations.values()] |
| 32 | 32 |
| 33 def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): | 33 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): |
| 34 self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) | 34 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) |
| 35 | 35 |
| 36 class PredictedTrajectoryConstant(PredictedTrajectory): | 36 class PredictedTrajectoryConstant(PredictedTrajectory): |
| 37 '''Predicted trajectory at constant speed or acceleration | 37 '''Predicted trajectory at constant speed or acceleration |
| 38 TODO generalize by passing a series of velocities/accelerations''' | 38 TODO generalize by passing a series of velocities/accelerations''' |
| 39 | 39 |
| 40 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): | 40 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): |
| 41 self.control = control | 41 self.control = control |
| 42 self.maxSpeed = maxSpeed | 42 self.maxSpeed = maxSpeed |
| 43 self.probability = probability | 43 self.probability = probability |
| 44 self.predictedPositions = {0: initialPosition} | 44 self.predictedPositions = {0: initialPosition} |
| 45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
| 46 | 46 |
| 47 def getControl(self): | 47 def getControl(self): |
| 48 return self.control | 48 return self.control |
| 49 | 49 |
| 50 class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): | 50 class PredictedTrajectoryPrototype(PredictedTrajectory): |
| 51 '''Random small adaptation of vehicle control ''' | 51 '''Predicted trajectory that follows a prototype trajectory |
| 52 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): | 52 The prototype is in the format of a moving.Trajectory: it could be |
| 53 1. an observed trajectory (extracted from video) | |
| 54 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | |
| 55 | |
| 56 Prediction can be done | |
| 57 1. at constant speed (the instantaneous user speed) | |
| 58 2. following the trajectory path, at the speed of the user | |
| 59 (applying a constant ratio equal | |
| 60 to the ratio of the user instantaneous speed and the trajectory closest speed)''' | |
| 61 | |
| 62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): | |
| 63 self.prototypeTrajectory = prototypeTrajectory | |
| 64 self.constantSpeed = constantSpeed | |
| 65 self.probability = probability | |
| 66 self.predictedPositions = {0: initialPosition} | |
| 67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 68 | |
| 69 def predictPosition(self, nTimeSteps): | |
| 70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | |
| 71 if constantSpeed: | |
| 72 # calculate cumulative distance | |
| 73 pass | |
| 74 else: # see c++ code, calculate ratio | |
| 75 pass | |
| 76 return self.predictedPositions[nTimeSteps] | |
| 77 | |
| 78 class PredictedTrajectoryRandomControl(PredictedTrajectory): | |
| 79 '''Random vehicle control: suitable for normal adaptation''' | |
| 80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | |
| 53 '''Constructor | 81 '''Constructor |
| 54 accelerationDistribution and steeringDistribution are distributions | 82 accelerationDistribution and steeringDistribution are distributions |
| 55 that return random numbers drawn from them''' | 83 that return random numbers drawn from them''' |
| 56 self.accelerationDistribution = accelerationDistribution | 84 self.accelerationDistribution = accelerationDistribution |
| 57 self.steeringDistribution = steeringDistribution | 85 self.steeringDistribution = steeringDistribution |
| 61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 89 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
| 62 | 90 |
| 63 def getControl(self): | 91 def getControl(self): |
| 64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | 92 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) |
| 65 | 93 |
| 66 class PredictionParameters: | |
| 67 def __init__(self, name, maxSpeed): | |
| 68 self.name = name | |
| 69 self.maxSpeed = maxSpeed | |
| 70 | |
| 71 def __str__(self): | |
| 72 return '{0} {1}'.format(self.name, self.maxSpeed) | |
| 73 | |
| 74 class ConstantPredictionParameters(PredictionParameters): | |
| 75 def __init__(self, maxSpeed): | |
| 76 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | |
| 77 | |
| 78 def generatePredictedTrajectories(self, obj, instant): | |
| 79 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
| 80 maxSpeed = self.maxSpeed)] | |
| 81 | |
| 82 class NormalAdaptationPredictionParameters(PredictionParameters): | |
| 83 def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering): | |
| 84 PredictionParameters.__init__(self, 'normal adaptation', maxSpeed) | |
| 85 self.nPredictedTrajectories = nPredictedTrajectories | |
| 86 self.maxAcceleration = maxAcceleration | |
| 87 self.maxSteering = maxSteering | |
| 88 self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, | |
| 89 self.maxAcceleration, 0.) | |
| 90 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 91 self.maxSteering, 0.) | |
| 92 | |
| 93 def __str__(self): | |
| 94 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, | |
| 95 self.maxAcceleration, | |
| 96 self.maxSteering) | |
| 97 | |
| 98 def generatePredictedTrajectories(self, obj, instant): | |
| 99 predictedTrajectories = [] | |
| 100 for i in xrange(self.nPredictedTrajectories): | |
| 101 predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | |
| 102 obj.getVelocityAtInstant(instant), | |
| 103 self.accelerationDistribution, | |
| 104 self.steeringDistribution, | |
| 105 maxSpeed = self.maxSpeed)) | |
| 106 return predictedTrajectories | |
| 107 | |
| 108 class PointSetPredictionParameters(PredictionParameters): | |
| 109 # todo generate several trajectories with normal adaptatoins from each position (feature) | |
| 110 def __init__(self, maxSpeed): | |
| 111 PredictionParameters.__init__(self, 'point set', maxSpeed) | |
| 112 | |
| 113 def generatePredictedTrajectories(self, obj, instant): | |
| 114 predictedTrajectories = [] | |
| 115 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 116 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 117 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 118 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 119 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, | |
| 120 maxSpeed = self.maxSpeed)) | |
| 121 return predictedTrajectories | |
| 122 | |
| 123 class EvasiveActionPredictionParameters(PredictionParameters): | |
| 124 def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): | |
| 125 if useFeatures: | |
| 126 name = 'point set evasive action' | |
| 127 else: | |
| 128 name = 'evasive action' | |
| 129 PredictionParameters.__init__(self, name, maxSpeed) | |
| 130 self.nPredictedTrajectories = nPredictedTrajectories | |
| 131 self.minAcceleration = minAcceleration | |
| 132 self.maxAcceleration = maxAcceleration | |
| 133 self.maxSteering = maxSteering | |
| 134 self.useFeatures = useFeatures | |
| 135 self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, | |
| 136 self.maxAcceleration, 0.) | |
| 137 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 138 self.maxSteering, 0.) | |
| 139 | |
| 140 def __str__(self): | |
| 141 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
| 142 | |
| 143 def generatePredictedTrajectories(self, obj, instant): | |
| 144 predictedTrajectories = [] | |
| 145 if self.useFeatures: | |
| 146 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 147 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 148 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 149 else: | |
| 150 positions = [obj.getPositionAtInstant(instant)] | |
| 151 velocities = [obj.getVelocityAtInstant(instant)] | |
| 152 for i in xrange(self.nPredictedTrajectories): | |
| 153 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 154 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, | |
| 155 initialVelocity, | |
| 156 moving.NormAngle(self.accelerationDistribution(), | |
| 157 self.steeringDistribution()), | |
| 158 maxSpeed = self.maxSpeed)) | |
| 159 return predictedTrajectories | |
| 160 | |
| 161 class SafetyPoint(moving.Point): | 94 class SafetyPoint(moving.Point): |
| 162 '''Can represent a collision point or crossing zone | 95 '''Can represent a collision point or crossing zone |
| 163 with respective safety indicator, TTC or pPET''' | 96 with respective safety indicator, TTC or pPET''' |
| 164 def __init__(self, p, probability = 1., indicator = -1): | 97 def __init__(self, p, probability = 1., indicator = -1): |
| 165 self.x = p.x | 98 self.x = p.x |
| 173 @staticmethod | 106 @staticmethod |
| 174 def save(out, points, predictionInstant, objNum1, objNum2): | 107 def save(out, points, predictionInstant, objNum1, objNum2): |
| 175 for p in points: | 108 for p in points: |
| 176 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | 109 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) |
| 177 | 110 |
| 178 def computeExpectedIndicator(points): | 111 @staticmethod |
| 179 from numpy import sum | 112 def computeExpectedIndicator(points): |
| 180 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 113 from numpy import sum |
| 114 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | |
| 181 | 115 |
| 182 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | 116 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): |
| 183 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' | 117 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' |
| 184 t = 1 | 118 t = 1 |
| 185 p1 = predictedTrajectory1.predictPosition(t) | 119 p1 = predictedTrajectory1.predictPosition(t) |
| 188 p1 = predictedTrajectory1.predictPosition(t) | 122 p1 = predictedTrajectory1.predictPosition(t) |
| 189 p2 = predictedTrajectory2.predictPosition(t) | 123 p2 = predictedTrajectory2.predictPosition(t) |
| 190 t += 1 | 124 t += 1 |
| 191 return t, p1, p2 | 125 return t, p1, p2 |
| 192 | 126 |
| 193 def computeCrossingsCollisionsAtInstant(currentInstant, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 127 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon): |
| 194 '''returns the lists of collision points and crossing zones | 128 from matplotlib.pyplot import figure, axis, title, close, savefig |
| 195 | 129 figure() |
| 196 Check: Predicting all the points together, as if they represent the whole vehicle''' | 130 for et in predictedTrajectories1: |
| 197 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, currentInstant) | 131 et.predictPosition(timeHorizon) |
| 198 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, currentInstant) | 132 et.plot('rx') |
| 133 | |
| 134 for et in predictedTrajectories2: | |
| 135 et.predictPosition(timeHorizon) | |
| 136 et.plot('bx') | |
| 137 obj1.plot('r') | |
| 138 obj2.plot('b') | |
| 139 title('instant {0}'.format(currentInstant)) | |
| 140 axis('equal') | |
| 141 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) | |
| 142 close() | |
| 143 | |
| 144 def computeCrossingsCollisionsAtInstant(predictionParams, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
| 145 '''returns the lists of collision points and crossing zones''' | |
| 146 predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) | |
| 147 predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) | |
| 199 | 148 |
| 200 collisionPoints = [] | 149 collisionPoints = [] |
| 201 crossingZones = [] | 150 crossingZones = [] |
| 202 for et1 in predictedTrajectories1: | 151 for et1 in predictedTrajectories1: |
| 203 for et2 in predictedTrajectories2: | 152 for et2 in predictedTrajectories2: |
| 204 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | 153 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) |
| 205 | 154 |
| 206 if t <= timeHorizon: | 155 if t <= timeHorizon: |
| 207 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | 156 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) |
| 208 elif computeCZ: # check if there is a crossing zone | 157 elif computeCZ: # check if there is a crossing zone |
| 209 # TODO? zone should be around the points at which the traj are the closest | 158 # TODO? zone should be around the points at which the traj are the closest |
| 210 # look for CZ at different times, otherwise it would be a collision | 159 # look for CZ at different times, otherwise it would be a collision |
| 215 t2 = 0 | 164 t2 = 0 |
| 216 while not cz and t2 < timeHorizon: | 165 while not cz and t2 < timeHorizon: |
| 217 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | 166 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: |
| 218 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | 167 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) |
| 219 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | 168 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) |
| 220 if cz: | 169 if cz != None: |
| 221 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | 170 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) |
| 222 t2 += 1 | 171 t2 += 1 |
| 223 t1 += 1 | 172 t1 += 1 |
| 224 | 173 |
| 225 if debug: | 174 if debug: |
| 226 from matplotlib.pyplot import figure, axis, title | 175 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) |
| 227 figure() | 176 |
| 228 for et in predictedTrajectories1: | 177 return currentInstant, collisionPoints, crossingZones |
| 229 et.predictPosition(timeHorizon) | 178 |
| 230 et.draw('rx') | 179 def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): |
| 231 | |
| 232 for et in predictedTrajectories2: | |
| 233 et.predictPosition(timeHorizon) | |
| 234 et.draw('bx') | |
| 235 obj1.draw('r') | |
| 236 obj2.draw('b') | |
| 237 title('instant {0}'.format(i)) | |
| 238 axis('equal') | |
| 239 | |
| 240 return collisionPoints, crossingZones | |
| 241 | |
| 242 def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
| 243 '''Computes all crossing and collision points at each common instant for two road users. ''' | 180 '''Computes all crossing and collision points at each common instant for two road users. ''' |
| 244 collisionPoints={} | 181 collisionPoints={} |
| 245 crossingZones={} | 182 crossingZones={} |
| 246 if timeInterval: | 183 if timeInterval: |
| 247 commonTimeInterval = timeInterval | 184 commonTimeInterval = timeInterval |
| 248 else: | 185 else: |
| 249 commonTimeInterval = obj1.commonTimeInterval(obj2) | 186 commonTimeInterval = obj1.commonTimeInterval(obj2) |
| 250 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 187 if nProcesses == 1: |
| 251 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | 188 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
| 252 | 189 i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) |
| 190 if len(cp) != 0: | |
| 191 collisionPoints[i] = cp | |
| 192 if len(cz) != 0: | |
| 193 crossingZones[i] = cz | |
| 194 else: | |
| 195 from multiprocessing import Pool | |
| 196 pool = Pool(processes = nProcesses) | |
| 197 jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] | |
| 198 #results = [j.get() for j in jobs] | |
| 199 #results.sort() | |
| 200 for j in jobs: | |
| 201 i, cp, cz = j.get() | |
| 202 #if len(cp) != 0 or len(cz) != 0: | |
| 203 if len(cp) != 0: | |
| 204 collisionPoints[i] = cp | |
| 205 if len(cz) != 0: | |
| 206 crossingZones[i] = cz | |
| 207 pool.close() | |
| 253 return collisionPoints, crossingZones | 208 return collisionPoints, crossingZones |
| 254 | 209 |
| 255 def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | 210 class PredictionParameters: |
| 256 '''Computes only collision probabilities | 211 def __init__(self, name, maxSpeed): |
| 257 Returns for each instant the collision probability and number of samples drawn''' | 212 self.name = name |
| 258 collisionProbabilities = {} | 213 self.maxSpeed = maxSpeed |
| 259 if timeInterval: | 214 |
| 260 commonTimeInterval = timeInterval | 215 def __str__(self): |
| 261 else: | 216 return '{0} {1}'.format(self.name, self.maxSpeed) |
| 262 commonTimeInterval = obj1.commonTimeInterval(obj2) | 217 |
| 263 for i in list(commonTimeInterval)[:-1]: | 218 def generatePredictedTrajectories(self, obj, instant): |
| 264 nCollisions = 0 | 219 return [] |
| 265 print(obj1.num, obj2.num, i) | 220 |
| 266 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) | 221 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): |
| 267 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) | 222 return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) |
| 268 for et1 in predictedTrajectories1: | 223 |
| 269 for et2 in predictedTrajectories2: | 224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): |
| 270 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | 225 return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses) |
| 271 if t <= timeHorizon: | 226 |
| 272 nCollisions += 1 | 227 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): |
| 273 # take into account probabilities ?? | 228 '''Computes only collision probabilities |
| 274 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | 229 Returns for each instant the collision probability and number of samples drawn''' |
| 275 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | 230 collisionProbabilities = {} |
| 276 | 231 if timeInterval: |
| 277 if debug: | 232 commonTimeInterval = timeInterval |
| 278 from matplotlib.pyplot import figure, axis, title | 233 else: |
| 234 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 235 for i in list(commonTimeInterval)[:-1]: | |
| 236 nCollisions = 0 | |
| 237 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) | |
| 238 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) | |
| 239 for et1 in predictedTrajectories1: | |
| 240 for et2 in predictedTrajectories2: | |
| 241 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 242 if t <= timeHorizon: | |
| 243 nCollisions += 1 | |
| 244 # take into account probabilities ?? | |
| 245 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
| 246 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | |
| 247 | |
| 248 if debug: | |
| 249 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | |
| 250 | |
| 251 return collisionProbabilities | |
| 252 | |
| 253 class ConstantPredictionParameters(PredictionParameters): | |
| 254 def __init__(self, maxSpeed): | |
| 255 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | |
| 256 | |
| 257 def generatePredictedTrajectories(self, obj, instant): | |
| 258 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
| 259 maxSpeed = self.maxSpeed)] | |
| 260 | |
| 261 class NormalAdaptationPredictionParameters(PredictionParameters): | |
| 262 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
| 263 '''An example of acceleration and steering distributions is | |
| 264 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) | |
| 265 ''' | |
| 266 if useFeatures: | |
| 267 name = 'point set normal adaptation' | |
| 268 else: | |
| 269 name = 'normal adaptation' | |
| 270 PredictionParameters.__init__(self, name, maxSpeed) | |
| 271 self.nPredictedTrajectories = nPredictedTrajectories | |
| 272 self.useFeatures = useFeatures | |
| 273 self.accelerationDistribution = accelerationDistribution | |
| 274 self.steeringDistribution = steeringDistribution | |
| 275 | |
| 276 def __str__(self): | |
| 277 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, | |
| 278 self.maxAcceleration, | |
| 279 self.maxSteering) | |
| 280 | |
| 281 def generatePredictedTrajectories(self, obj, instant): | |
| 282 predictedTrajectories = [] | |
| 283 if self.useFeatures: | |
| 284 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 285 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 286 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 287 else: | |
| 288 positions = [obj.getPositionAtInstant(instant)] | |
| 289 velocities = [obj.getVelocityAtInstant(instant)] | |
| 290 for i in xrange(self.nPredictedTrajectories): | |
| 291 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 292 predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition, | |
| 293 initialVelocity, | |
| 294 self.accelerationDistribution, | |
| 295 self.steeringDistribution, | |
| 296 maxSpeed = self.maxSpeed)) | |
| 297 return predictedTrajectories | |
| 298 | |
| 299 class PointSetPredictionParameters(PredictionParameters): | |
| 300 # todo generate several trajectories with normal adaptatoins from each position (feature) | |
| 301 def __init__(self, maxSpeed): | |
| 302 PredictionParameters.__init__(self, 'point set', maxSpeed) | |
| 303 #self.nPredictedTrajectories = nPredictedTrajectories | |
| 304 | |
| 305 def generatePredictedTrajectories(self, obj, instant): | |
| 306 predictedTrajectories = [] | |
| 307 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 308 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 309 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 310 #for i in xrange(self.nPredictedTrajectories): | |
| 311 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 312 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, | |
| 313 maxSpeed = self.maxSpeed)) | |
| 314 return predictedTrajectories | |
| 315 | |
| 316 class EvasiveActionPredictionParameters(PredictionParameters): | |
| 317 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
| 318 '''Suggested acceleration distribution may not be symmetric, eg | |
| 319 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)''' | |
| 320 | |
| 321 if useFeatures: | |
| 322 name = 'point set evasive action' | |
| 323 else: | |
| 324 name = 'evasive action' | |
| 325 PredictionParameters.__init__(self, name, maxSpeed) | |
| 326 self.nPredictedTrajectories = nPredictedTrajectories | |
| 327 self.useFeatures = useFeatures | |
| 328 self.accelerationDistribution = accelerationDistribution | |
| 329 self.steeringDistribution = steeringDistribution | |
| 330 | |
| 331 def __str__(self): | |
| 332 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
| 333 | |
| 334 def generatePredictedTrajectories(self, obj, instant): | |
| 335 predictedTrajectories = [] | |
| 336 if self.useFeatures: | |
| 337 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 338 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 339 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 340 else: | |
| 341 positions = [obj.getPositionAtInstant(instant)] | |
| 342 velocities = [obj.getVelocityAtInstant(instant)] | |
| 343 for i in xrange(self.nPredictedTrajectories): | |
| 344 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 345 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, | |
| 346 initialVelocity, | |
| 347 moving.NormAngle(self.accelerationDistribution(), | |
| 348 self.steeringDistribution()), | |
| 349 maxSpeed = self.maxSpeed)) | |
| 350 return predictedTrajectories | |
| 351 | |
| 352 | |
| 353 class CVDirectPredictionParameters(PredictionParameters): | |
| 354 '''Prediction parameters of prediction at constant velocity | |
| 355 using direct computation of the intersecting point''' | |
| 356 | |
| 357 def __init__(self): | |
| 358 PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) | |
| 359 | |
| 360 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
| 361 collisionPoints = [] | |
| 362 crossingZones = [] | |
| 363 | |
| 364 p1 = obj1.getPositionAtInstant(currentInstant) | |
| 365 p2 = obj2.getPositionAtInstant(currentInstant) | |
| 366 if (p1-p2).norm2() <= collisionDistanceThreshold: | |
| 367 collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)] | |
| 368 else: | |
| 369 v1 = obj1.getVelocityAtInstant(currentInstant) | |
| 370 v2 = obj2.getVelocityAtInstant(currentInstant) | |
| 371 intersection = moving.intersection(p1, p2, v1, v2) | |
| 372 | |
| 373 if intersection != None: | |
| 374 dp1 = intersection-p1 | |
| 375 dp2 = intersection-p2 | |
| 376 if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection | |
| 377 dist1 = dp1.norm2() | |
| 378 dist2 = dp2.norm2() | |
| 379 s1 = v1.norm2() | |
| 380 s2 = v2.norm2() | |
| 381 halfCollisionDistanceThreshold = collisionDistanceThreshold/2. | |
| 382 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) | |
| 383 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) | |
| 384 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) | |
| 385 if computeCZ and collisionTimeInterval.empty(): | |
| 386 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] | |
| 387 else: | |
| 388 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] | |
| 389 | |
| 390 if debug and intersection!= None: | |
| 391 from matplotlib.pyplot import plot, figure, axis, title | |
| 279 figure() | 392 figure() |
| 280 for et in predictedTrajectories1: | 393 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') |
| 281 et.predictPosition(timeHorizon) | 394 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') |
| 282 et.draw('rx') | 395 intersection.plot() |
| 283 | 396 obj1.plot('r') |
| 284 for et in predictedTrajectories2: | 397 obj2.plot('b') |
| 285 et.predictPosition(timeHorizon) | 398 title('instant {0}'.format(currentInstant)) |
| 286 et.draw('bx') | |
| 287 obj1.draw('r') | |
| 288 obj2.draw('b') | |
| 289 title('instant {0}'.format(i)) | |
| 290 axis('equal') | 399 axis('equal') |
| 291 | 400 |
| 292 return collisionProbabilities | 401 return collisionPoints, crossingZones |
| 402 | |
| 403 class CVExactPredictionParameters(PredictionParameters): | |
| 404 '''Prediction parameters of prediction at constant velocity | |
| 405 using direct computation of the intersecting point (solving for the equation''' | |
| 406 | |
| 407 def __init__(self): | |
| 408 PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None) | |
| 409 | |
| 410 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
| 411 'TODO add collision point coordinates, compute pPET' | |
| 412 #collisionPoints = [] | |
| 413 #crossingZones = [] | |
| 414 | |
| 415 p1 = obj1.getPositionAtInstant(currentInstant) | |
| 416 p2 = obj2.getPositionAtInstant(currentInstant) | |
| 417 v1 = obj1.getVelocityAtInstant(currentInstant) | |
| 418 v2 = obj2.getVelocityAtInstant(currentInstant) | |
| 419 intersection = moving.intersection(p1, p2, v1, v2) | |
| 420 | |
| 421 if intersection != None: | |
| 422 ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold) | |
| 423 if ttc: | |
| 424 return [SafetyPoint(intersection, 1., ttc)], [] # (p1+v1.multiply(ttc)+p2+v2.multiply(ttc)).multiply(0.5) | |
| 425 else: | |
| 426 return [],[] | |
| 427 | |
| 428 #### | |
| 429 # Other Methods | |
| 430 #### | |
| 431 | |
| 432 | |
| 433 | |
| 293 | 434 |
| 294 | 435 |
| 295 if __name__ == "__main__": | 436 if __name__ == "__main__": |
| 296 import doctest | 437 import doctest |
| 297 import unittest | 438 import unittest |
