Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/prediction.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 15 Jun 2018 11:19:10 -0400 |
| parents | python/prediction.py@933670761a57 |
| children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
| 1027:6129296848d3 | 1028:cc5cb04b04b0 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 '''Library for motion prediction methods''' | |
| 3 | |
| 4 import moving | |
| 5 from utils import LCSS | |
| 6 | |
| 7 import math, random | |
| 8 from copy import copy | |
| 9 import numpy as np | |
| 10 #from multiprocessing import Pool | |
| 11 | |
| 12 | |
| 13 class PredictedTrajectory(object): | |
| 14 '''Class for predicted trajectories with lazy evaluation | |
| 15 if the predicted position has not been already computed, compute it | |
| 16 | |
| 17 it should also have a probability''' | |
| 18 | |
| 19 def __init__(self): | |
| 20 self.probability = 0. | |
| 21 self.predictedPositions = {} | |
| 22 self.predictedSpeedOrientations = {} | |
| 23 #self.collisionPoints = {} | |
| 24 #self.crossingZones = {} | |
| 25 | |
| 26 def predictPosition(self, nTimeSteps): | |
| 27 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: | |
| 28 self.predictPosition(nTimeSteps-1) | |
| 29 self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) | |
| 30 return self.predictedPositions[nTimeSteps] | |
| 31 | |
| 32 def getPredictedTrajectory(self): | |
| 33 return moving.Trajectory.fromPointList(list(self.predictedPositions.values())) | |
| 34 | |
| 35 def getPredictedSpeeds(self): | |
| 36 return [so.norm for so in self.predictedSpeedOrientations.values()] | |
| 37 | |
| 38 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): | |
| 39 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) | |
| 40 | |
| 41 class PredictedTrajectoryConstant(PredictedTrajectory): | |
| 42 '''Predicted trajectory at constant speed or acceleration | |
| 43 TODO generalize by passing a series of velocities/accelerations''' | |
| 44 | |
| 45 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): | |
| 46 self.control = control | |
| 47 self.maxSpeed = maxSpeed | |
| 48 self.probability = probability | |
| 49 self.predictedPositions = {0: initialPosition} | |
| 50 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 51 | |
| 52 def getControl(self): | |
| 53 return self.control | |
| 54 | |
| 55 class PredictedTrajectoryPrototype(PredictedTrajectory): | |
| 56 '''Predicted trajectory that follows a prototype trajectory | |
| 57 The prototype is in the format of a moving.Trajectory: it could be | |
| 58 1. an observed trajectory (extracted from video) | |
| 59 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | |
| 60 | |
| 61 Prediction can be done | |
| 62 1. at constant speed (the instantaneous user speed) | |
| 63 2. following the trajectory path, at the speed of the user | |
| 64 (applying a constant ratio equal | |
| 65 to the ratio of the user instantaneous speed and the trajectory closest speed)''' | |
| 66 | |
| 67 def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.): | |
| 68 ''' prototype is a MovingObject | |
| 69 | |
| 70 Prediction at constant speed will not work for unrealistic trajectories | |
| 71 that do not follow a slowly changing velocity (eg moving object trajectories, | |
| 72 but is good for realistic motion (eg features)''' | |
| 73 self.prototype = prototype | |
| 74 self.constantSpeed = constantSpeed | |
| 75 self.nFramesIgnore = nFramesIgnore | |
| 76 self.probability = probability | |
| 77 self.predictedPositions = {0: initialPosition} | |
| 78 self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) | |
| 79 self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position | |
| 80 self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() | |
| 81 self.initialSpeed = initialVelocity.norm2() | |
| 82 if not constantSpeed: | |
| 83 self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() | |
| 84 | |
| 85 def predictPosition(self, nTimeSteps): | |
| 86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: | |
| 87 deltaPosition = copy(self.deltaPosition) | |
| 88 if self.constantSpeed: | |
| 89 traj = self.prototype.getPositions() | |
| 90 trajLength = traj.length() | |
| 91 traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) | |
| 92 i = self.closestPointIdx | |
| 93 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: | |
| 94 i += 1 | |
| 95 if i == trajLength: | |
| 96 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) | |
| 97 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) | |
| 98 else: | |
| 99 v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore)) | |
| 100 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) | |
| 101 else: | |
| 102 traj = self.prototype.getPositions() | |
| 103 trajLength = traj.length() | |
| 104 nSteps = self.ratio*nTimeSteps+self.closestPointIdx | |
| 105 i = int(np.floor(nSteps)) | |
| 106 if nSteps < trajLength-1: | |
| 107 v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore)) | |
| 108 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) | |
| 109 else: | |
| 110 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) | |
| 111 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) | |
| 112 return self.predictedPositions[nTimeSteps] | |
| 113 | |
| 114 class PredictedTrajectoryRandomControl(PredictedTrajectory): | |
| 115 '''Random vehicle control: suitable for normal adaptation''' | |
| 116 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | |
| 117 '''Constructor | |
| 118 accelerationDistribution and steeringDistribution are distributions | |
| 119 that return random numbers drawn from them''' | |
| 120 self.accelerationDistribution = accelerationDistribution | |
| 121 self.steeringDistribution = steeringDistribution | |
| 122 self.maxSpeed = maxSpeed | |
| 123 self.probability = probability | |
| 124 self.predictedPositions = {0: initialPosition} | |
| 125 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 126 | |
| 127 def getControl(self): | |
| 128 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | |
| 129 | |
| 130 class SafetyPoint(moving.Point): | |
| 131 '''Can represent a collision point or crossing zone | |
| 132 with respective safety indicator, TTC or pPET''' | |
| 133 def __init__(self, p, probability = 1., indicator = -1): | |
| 134 self.x = p.x | |
| 135 self.y = p.y | |
| 136 self.probability = probability | |
| 137 self.indicator = indicator | |
| 138 | |
| 139 def __str__(self): | |
| 140 return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) | |
| 141 | |
| 142 @staticmethod | |
| 143 def save(out, points, predictionInstant, objNum1, objNum2): | |
| 144 for p in points: | |
| 145 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | |
| 146 | |
| 147 @staticmethod | |
| 148 def computeExpectedIndicator(points): | |
| 149 return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | |
| 150 | |
| 151 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
| 152 '''Computes the first instant | |
| 153 at which two predicted trajectories are within some distance threshold | |
| 154 Computes all the times including timeHorizon | |
| 155 | |
| 156 User has to check the first variable collision to know about a collision''' | |
| 157 t = 1 | |
| 158 p1 = predictedTrajectory1.predictPosition(t) | |
| 159 p2 = predictedTrajectory2.predictPosition(t) | |
| 160 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
| 161 while t < timeHorizon and not collision: | |
| 162 t += 1 | |
| 163 p1 = predictedTrajectory1.predictPosition(t) | |
| 164 p2 = predictedTrajectory2.predictPosition(t) | |
| 165 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
| 166 return collision, t, p1, p2 | |
| 167 | |
| 168 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): | |
| 169 from matplotlib.pyplot import figure, axis, title, clf, savefig | |
| 170 if printFigure: | |
| 171 clf() | |
| 172 else: | |
| 173 figure() | |
| 174 for et in predictedTrajectories1: | |
| 175 for t in range(int(np.round(timeHorizon))): | |
| 176 et.predictPosition(t) | |
| 177 et.plot('rx') | |
| 178 for et in predictedTrajectories2: | |
| 179 for t in range(int(np.round(timeHorizon))): | |
| 180 et.predictPosition(t) | |
| 181 et.plot('bx') | |
| 182 obj1.plot('r', withOrigin = True) | |
| 183 obj2.plot('b', withOrigin = True) | |
| 184 title('instant {0}'.format(currentInstant)) | |
| 185 axis('equal') | |
| 186 if printFigure: | |
| 187 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) | |
| 188 | |
| 189 def calculateProbability(nMatching,similarity,objects): | |
| 190 sumFrequencies=sum([nMatching[p] for p in similarity]) | |
| 191 prototypeProbability={} | |
| 192 for i in similarity: | |
| 193 prototypeProbability[i]= similarity[i] * float(nMatching[i])/sumFrequencies | |
| 194 sumProbabilities= sum([prototypeProbability[p] for p in prototypeProbability]) | |
| 195 probabilities={} | |
| 196 for i in prototypeProbability: | |
| 197 probabilities[objects[i]]= float(prototypeProbability[i])/sumProbabilities | |
| 198 return probabilities | |
| 199 | |
| 200 def findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,spatialThreshold=1.0, delta=180): | |
| 201 ''' behaviour prediction first step''' | |
| 202 if route[0] not in noiseEntryNums: | |
| 203 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] | |
| 204 elif route[1] not in noiseExitNums: | |
| 205 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] | |
| 206 else: | |
| 207 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
| 208 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
| 209 similarity={} | |
| 210 for y in prototypesRoutes: | |
| 211 if y in prototypes: | |
| 212 prototypesIDs=prototypes[y] | |
| 213 for x in prototypesIDs: | |
| 214 s=lcss.computeNormalized(partialObjPositions, objects[x].positions) | |
| 215 if s >= minSimilarity: | |
| 216 similarity[x]=s | |
| 217 | |
| 218 if mostMatched==None: | |
| 219 probabilities= calculateProbability(nMatching,similarity,objects) | |
| 220 return probabilities | |
| 221 else: | |
| 222 mostMatchedValues=sorted(similarity.values(),reverse=True)[:mostMatched] | |
| 223 keys=[k for k in similarity if similarity[k] in mostMatchedValues] | |
| 224 newSimilarity={} | |
| 225 for i in keys: | |
| 226 newSimilarity[i]=similarity[i] | |
| 227 probabilities= calculateProbability(nMatching,newSimilarity,objects) | |
| 228 return probabilities | |
| 229 | |
| 230 def findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,spatialThreshold=1.0, delta=180): | |
| 231 if useDestination: | |
| 232 prototypesRoutes=[route] | |
| 233 else: | |
| 234 if route[0] not in noiseEntryNums: | |
| 235 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] | |
| 236 elif route[1] not in noiseExitNums: | |
| 237 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] | |
| 238 else: | |
| 239 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
| 240 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
| 241 similarity={} | |
| 242 for y in prototypesRoutes: | |
| 243 if y in prototypes: | |
| 244 prototypesIDs=prototypes[y] | |
| 245 for x in prototypesIDs: | |
| 246 s=lcss.computeNormalized(partialObjPositions, objects[x].positions) | |
| 247 if s >= minSimilarity: | |
| 248 similarity[x]=s | |
| 249 | |
| 250 newSimilarity={} | |
| 251 for i in similarity: | |
| 252 if i in secondStepPrototypes: | |
| 253 for j in secondStepPrototypes[i]: | |
| 254 newSimilarity[j]=similarity[i] | |
| 255 probabilities= calculateProbability(nMatching,newSimilarity,objects) | |
| 256 return probabilities | |
| 257 | |
| 258 def getPrototypeTrajectory(obj,route,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True): | |
| 259 partialInterval=moving.Interval(obj.getFirstInstant(),currentInstant) | |
| 260 partialObjPositions= obj.getObjectInTimeInterval(partialInterval).positions | |
| 261 if useSpeedPrototype: | |
| 262 prototypeTrajectories=findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
| 263 else: | |
| 264 prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
| 265 return prototypeTrajectories | |
| 266 | |
| 267 | |
| 268 class PredictionParameters(object): | |
| 269 def __init__(self, name, maxSpeed): | |
| 270 self.name = name | |
| 271 self.maxSpeed = maxSpeed | |
| 272 | |
| 273 def __str__(self): | |
| 274 return '{0} {1}'.format(self.name, self.maxSpeed) | |
| 275 | |
| 276 def generatePredictedTrajectories(self, obj, instant): | |
| 277 return None | |
| 278 | |
| 279 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
| 280 '''returns the lists of collision points and crossing zones''' | |
| 281 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | |
| 282 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | |
| 283 | |
| 284 collisionPoints = [] | |
| 285 if computeCZ: | |
| 286 crossingZones = [] | |
| 287 else: | |
| 288 crossingZones = None | |
| 289 for et1 in predictedTrajectories1: | |
| 290 for et2 in predictedTrajectories2: | |
| 291 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 292 if collision: | |
| 293 collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t)) | |
| 294 elif computeCZ: # check if there is a crossing zone | |
| 295 # TODO same computation as PET with metric + concatenate past trajectory with future trajectory | |
| 296 cz = None | |
| 297 t1 = 0 | |
| 298 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 | |
| 299 t2 = 0 | |
| 300 while not cz and t2 < timeHorizon: | |
| 301 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | |
| 302 if cz is not None: | |
| 303 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() | |
| 304 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) | |
| 305 t2 += 1 | |
| 306 t1 += 1 | |
| 307 | |
| 308 if debug: | |
| 309 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | |
| 310 | |
| 311 return collisionPoints, crossingZones | |
| 312 | |
| 313 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1): | |
| 314 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
| 315 collisionPoints = {} | |
| 316 if computeCZ: | |
| 317 crossingZones = {} | |
| 318 else: | |
| 319 crossingZones = None | |
| 320 if timeInterval is not None: | |
| 321 commonTimeInterval = timeInterval | |
| 322 else: | |
| 323 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 324 #if nProcesses == 1: | |
| 325 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
| 326 cp, cz = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | |
| 327 if len(cp) != 0: | |
| 328 collisionPoints[i] = cp | |
| 329 if computeCZ and len(cz) != 0: | |
| 330 crossingZones[i] = cz | |
| 331 return collisionPoints, crossingZones | |
| 332 | |
| 333 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | |
| 334 '''Computes only collision probabilities | |
| 335 Returns for each instant the collision probability and number of samples drawn''' | |
| 336 collisionProbabilities = {} | |
| 337 if timeInterval is not None: | |
| 338 commonTimeInterval = timeInterval | |
| 339 else: | |
| 340 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 341 for i in list(commonTimeInterval)[:-1]: | |
| 342 nCollisions = 0 | |
| 343 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) | |
| 344 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) | |
| 345 for et1 in predictedTrajectories1: | |
| 346 for et2 in predictedTrajectories2: | |
| 347 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 348 if collision: | |
| 349 nCollisions += 1 | |
| 350 # take into account probabilities ?? | |
| 351 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
| 352 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | |
| 353 | |
| 354 if debug: | |
| 355 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | |
| 356 | |
| 357 return collisionProbabilities | |
| 358 | |
| 359 class ConstantPredictionParameters(PredictionParameters): | |
| 360 def __init__(self, maxSpeed): | |
| 361 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | |
| 362 | |
| 363 def generatePredictedTrajectories(self, obj, instant): | |
| 364 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] | |
| 365 | |
| 366 class NormalAdaptationPredictionParameters(PredictionParameters): | |
| 367 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
| 368 '''An example of acceleration and steering distributions is | |
| 369 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) | |
| 370 ''' | |
| 371 if useFeatures: | |
| 372 name = 'point set normal adaptation' | |
| 373 else: | |
| 374 name = 'normal adaptation' | |
| 375 PredictionParameters.__init__(self, name, maxSpeed) | |
| 376 self.nPredictedTrajectories = nPredictedTrajectories | |
| 377 self.useFeatures = useFeatures | |
| 378 self.accelerationDistribution = accelerationDistribution | |
| 379 self.steeringDistribution = steeringDistribution | |
| 380 | |
| 381 def __str__(self): | |
| 382 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, | |
| 383 self.maxAcceleration, | |
| 384 self.maxSteering) | |
| 385 | |
| 386 def generatePredictedTrajectories(self, obj, instant): | |
| 387 predictedTrajectories = [] | |
| 388 if self.useFeatures and obj.hasFeatures(): | |
| 389 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
| 390 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 391 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 392 else: | |
| 393 positions = [obj.getPositionAtInstant(instant)] | |
| 394 velocities = [obj.getVelocityAtInstant(instant)] | |
| 395 probability = 1./float(len(positions)*self.nPredictedTrajectories) | |
| 396 for i in range(self.nPredictedTrajectories): | |
| 397 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 398 predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition, | |
| 399 initialVelocity, | |
| 400 self.accelerationDistribution, | |
| 401 self.steeringDistribution, | |
| 402 probability, | |
| 403 maxSpeed = self.maxSpeed)) | |
| 404 return predictedTrajectories | |
| 405 | |
| 406 class PointSetPredictionParameters(PredictionParameters): | |
| 407 def __init__(self, maxSpeed): | |
| 408 PredictionParameters.__init__(self, 'point set', maxSpeed) | |
| 409 | |
| 410 def generatePredictedTrajectories(self, obj, instant): | |
| 411 predictedTrajectories = [] | |
| 412 if obj.hasFeatures(): | |
| 413 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
| 414 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 415 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 416 probability = 1./float(len(positions)) | |
| 417 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 418 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed)) | |
| 419 return predictedTrajectories | |
| 420 else: | |
| 421 print('Object {} has no features'.format(obj.getNum())) | |
| 422 return None | |
| 423 | |
| 424 | |
| 425 class EvasiveActionPredictionParameters(PredictionParameters): | |
| 426 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
| 427 '''Suggested acceleration distribution may not be symmetric, eg | |
| 428 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)''' | |
| 429 | |
| 430 if useFeatures: | |
| 431 name = 'point set evasive action' | |
| 432 else: | |
| 433 name = 'evasive action' | |
| 434 PredictionParameters.__init__(self, name, maxSpeed) | |
| 435 self.nPredictedTrajectories = nPredictedTrajectories | |
| 436 self.useFeatures = useFeatures | |
| 437 self.accelerationDistribution = accelerationDistribution | |
| 438 self.steeringDistribution = steeringDistribution | |
| 439 | |
| 440 def __str__(self): | |
| 441 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
| 442 | |
| 443 def generatePredictedTrajectories(self, obj, instant): | |
| 444 predictedTrajectories = [] | |
| 445 if self.useFeatures and obj.hasFeatures(): | |
| 446 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
| 447 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 448 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 449 else: | |
| 450 positions = [obj.getPositionAtInstant(instant)] | |
| 451 velocities = [obj.getVelocityAtInstant(instant)] | |
| 452 probability = 1./float(self.nPredictedTrajectories) | |
| 453 for i in range(self.nPredictedTrajectories): | |
| 454 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 455 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, | |
| 456 initialVelocity, | |
| 457 moving.NormAngle(self.accelerationDistribution(), | |
| 458 self.steeringDistribution()), | |
| 459 probability, | |
| 460 self.maxSpeed)) | |
| 461 return predictedTrajectories | |
| 462 | |
| 463 | |
| 464 class CVDirectPredictionParameters(PredictionParameters): | |
| 465 '''Prediction parameters of prediction at constant velocity | |
| 466 using direct computation of the intersecting point | |
| 467 Warning: the computed time to collision may be higher than timeHorizon (not used)''' | |
| 468 | |
| 469 def __init__(self): | |
| 470 PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) | |
| 471 | |
| 472 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): | |
| 473 collisionPoints = [] | |
| 474 if computeCZ: | |
| 475 crossingZones = [] | |
| 476 else: | |
| 477 crossingZones = None | |
| 478 | |
| 479 p1 = obj1.getPositionAtInstant(currentInstant) | |
| 480 p2 = obj2.getPositionAtInstant(currentInstant) | |
| 481 if (p1-p2).norm2() <= collisionDistanceThreshold: | |
| 482 collisionPoints = [SafetyPoint((p1+p2)*0.5, 1., 0.)] | |
| 483 else: | |
| 484 v1 = obj1.getVelocityAtInstant(currentInstant) | |
| 485 v2 = obj2.getVelocityAtInstant(currentInstant) | |
| 486 intersection = moving.intersection(p1, p1+v1, p2, p2+v2) | |
| 487 | |
| 488 if intersection is not None: | |
| 489 dp1 = intersection-p1 | |
| 490 dp2 = intersection-p2 | |
| 491 dot1 = moving.Point.dot(dp1, v1) | |
| 492 dot2 = moving.Point.dot(dp2, v2) | |
| 493 if (computeCZ and (dot1 > 0 or dot2 > 0)) or (dot1 > 0 and dot2 > 0): # if the road users are moving towards the intersection or if computing pPET | |
| 494 dist1 = dp1.norm2() | |
| 495 dist2 = dp2.norm2() | |
| 496 s1 = math.copysign(v1.norm2(), dot1) | |
| 497 s2 = math.copysign(v2.norm2(), dot2) | |
| 498 halfCollisionDistanceThreshold = collisionDistanceThreshold/2. | |
| 499 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) | |
| 500 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) | |
| 501 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) | |
| 502 | |
| 503 if collisionTimeInterval.empty(): | |
| 504 if computeCZ: | |
| 505 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] | |
| 506 else: | |
| 507 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] | |
| 508 | |
| 509 if debug and intersection is not None: | |
| 510 from matplotlib.pyplot import plot, figure, axis, title | |
| 511 figure() | |
| 512 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') | |
| 513 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') | |
| 514 intersection.plot() | |
| 515 obj1.plot('r') | |
| 516 obj2.plot('b') | |
| 517 title('instant {0}'.format(currentInstant)) | |
| 518 axis('equal') | |
| 519 | |
| 520 return collisionPoints, crossingZones | |
| 521 | |
| 522 class CVExactPredictionParameters(PredictionParameters): | |
| 523 '''Prediction parameters of prediction at constant velocity | |
| 524 using direct computation of the intersecting point (solving the equation) | |
| 525 Warning: the computed time to collision may be higher than timeHorizon (not used)''' | |
| 526 | |
| 527 def __init__(self): | |
| 528 PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None) | |
| 529 | |
| 530 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): | |
| 531 'TODO compute pPET' | |
| 532 collisionPoints = [] | |
| 533 crossingZones = [] | |
| 534 | |
| 535 p1 = obj1.getPositionAtInstant(currentInstant) | |
| 536 p2 = obj2.getPositionAtInstant(currentInstant) | |
| 537 v1 = obj1.getVelocityAtInstant(currentInstant) | |
| 538 v2 = obj2.getVelocityAtInstant(currentInstant) | |
| 539 #intersection = moving.intersection(p1, p1+v1, p2, p2+v2) | |
| 540 | |
| 541 if not moving.Point.parallel(v1, v2): | |
| 542 ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold) | |
| 543 if ttc is not None: | |
| 544 collisionPoints = [SafetyPoint((p1+(v1*ttc)+p2+(v2*ttc))*0.5, 1., ttc)] | |
| 545 else: | |
| 546 pass # compute pPET | |
| 547 | |
| 548 return collisionPoints, crossingZones | |
| 549 | |
| 550 class PrototypePredictionParameters(PredictionParameters): | |
| 551 def __init__(self, prototypes, nPredictedTrajectories, pointSimilarityDistance, minSimilarity, lcssMetric = 'cityblock', minFeatureTime = 10, constantSpeed = False, useFeatures = True): | |
| 552 PredictionParameters.__init__(self, 'prototypes', None) | |
| 553 self.prototypes = prototypes | |
| 554 self.nPredictedTrajectories = nPredictedTrajectories | |
| 555 self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance) | |
| 556 self.minSimilarity = minSimilarity | |
| 557 self.minFeatureTime = minFeatureTime | |
| 558 self.constantSpeed = constantSpeed | |
| 559 self.useFeatures = useFeatures | |
| 560 | |
| 561 def getLcss(self): | |
| 562 return self.lcss | |
| 563 | |
| 564 def addPredictedTrajectories(self, predictedTrajectories, obj, instant): | |
| 565 obj.computeTrajectorySimilarities(self.prototypes, self.lcss) | |
| 566 for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): | |
| 567 if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: | |
| 568 initialPosition = obj.getPositionAtInstant(instant) | |
| 569 initialVelocity = obj.getVelocityAtInstant(instant) | |
| 570 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) | |
| 571 | |
| 572 def generatePredictedTrajectories(self, obj, instant): | |
| 573 predictedTrajectories = [] | |
| 574 if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: | |
| 575 if self.useFeatures and obj.hasFeatures(): | |
| 576 if not hasattr(obj, 'currentPredictionFeatures'): | |
| 577 obj.currentPredictionFeatures = [] | |
| 578 else: | |
| 579 obj.currentPredictionFeatures[:] = [f for f in obj.currentPredictionFeatures if f.existsAtInstant(instant)] | |
| 580 firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant) and f not in obj.currentPredictionFeatures] | |
| 581 firstInstants.sort(key = lambda t: t[1]) | |
| 582 for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants), self.nPredictedTrajectories-len(obj.currentPredictionFeatures))]: | |
| 583 obj.currentPredictionFeatures.append(f) | |
| 584 for f in obj.currentPredictionFeatures: | |
| 585 self.addPredictedTrajectories(predictedTrajectories, f, instant) | |
| 586 else: | |
| 587 self.addPredictedTrajectories(predictedTrajectories, obj, instant) | |
| 588 return predictedTrajectories | |
| 589 | |
| 590 if __name__ == "__main__": | |
| 591 import doctest | |
| 592 import unittest | |
| 593 suite = doctest.DocFileSuite('tests/prediction.txt') | |
| 594 #suite = doctest.DocTestSuite() | |
| 595 unittest.TextTestRunner().run(suite) | |
| 596 #doctest.testmod() | |
| 597 #doctest.testfile("example.txt") | |
| 598 |
