Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/prediction.py @ 1211:a095d4fbb2ea
work in progress
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Tue, 02 May 2023 07:12:26 -0400 |
| parents | 1bad5f9b60de |
| children | af329f3330ba |
comparison
equal
deleted
inserted
replaced
| 1210:1bad5f9b60de | 1211:a095d4fbb2ea |
|---|---|
| 32 return moving.Trajectory.fromPointList(list(self.predictedPositions.values())) | 32 return moving.Trajectory.fromPointList(list(self.predictedPositions.values())) |
| 33 | 33 |
| 34 def getPredictedSpeeds(self): | 34 def getPredictedSpeeds(self): |
| 35 return [so.norm for so in self.predictedSpeedOrientations.values()] | 35 return [so.norm for so in self.predictedSpeedOrientations.values()] |
| 36 | 36 |
| 37 def hasCollisionWith(self, other, t, collisionDistanceThreshold): | |
| 38 p1 = self.predictPosition(t) | |
| 39 p2 = other.predictPosition(t) | |
| 40 return (p1-p2).norm2() <= collisionDistanceThreshold, p1, p2 | |
| 41 | |
| 37 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): | 42 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): |
| 38 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) | 43 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) |
| 39 | 44 |
| 40 class PredictedTrajectoryConstant(PredictedTrajectory): | 45 class PredictedTrajectoryConstant(PredictedTrajectory): |
| 41 '''Predicted trajectory at constant speed or acceleration | 46 '''Predicted trajectory at constant speed or acceleration |
| 45 self.control = control | 50 self.control = control |
| 46 self.maxSpeed = maxSpeed | 51 self.maxSpeed = maxSpeed |
| 47 self.probability = probability | 52 self.probability = probability |
| 48 self.predictedPositions = {0: initialPosition} | 53 self.predictedPositions = {0: initialPosition} |
| 49 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 54 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
| 50 | 55 |
| 51 def getControl(self): | 56 def getControl(self): |
| 52 return self.control | 57 return self.control |
| 53 | 58 |
| 59 class PredictedTrajectoryConstantVelocity(PredictedTrajectory): | |
| 60 '''Predicted trajectory at constant speed or acceleration | |
| 61 TODO generalize by passing a series of velocities/accelerations''' | |
| 62 | |
| 63 def __init__(self, initialPosition, initialVelocity, probability = 1.): | |
| 64 self.probability = probability | |
| 65 self.predictedPositions = {0: initialPosition} | |
| 66 self.initialVelocity = initialVelocity | |
| 67 #self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
| 68 | |
| 69 def predictPosition(self, nTimeSteps): | |
| 70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: | |
| 71 self.predictPosition(nTimeSteps-1) | |
| 72 self.predictedPositions[nTimeSteps] = self.predictedPositions[nTimeSteps-1]+self.initialVelocity | |
| 73 return self.predictedPositions[nTimeSteps] | |
| 74 | |
| 75 class PredictedPolyTrajectoryConstant(PredictedTrajectory): | |
| 76 '''Predicted trajectory of an object with an outline represented by a polygon | |
| 77 Simple method that just translates the polygon corners''' | |
| 78 def __init__(self, polyCorners, initialVelocity, probability = 1.): | |
| 79 self.probability = probability | |
| 80 self.predictedPositions = {0: polyCorners} | |
| 81 self.initialVelocity = initialVelocity | |
| 82 self.predictedTrajectories = [PredictedTrajectoryConstant(p, initialVelocity) for p in polyCorners] | |
| 83 | |
| 54 class PredictedTrajectoryPrototype(PredictedTrajectory): | 84 class PredictedTrajectoryPrototype(PredictedTrajectory): |
| 55 '''Predicted trajectory that follows a prototype trajectory | 85 '''Predicted trajectory that follows a prototype trajectory |
| 56 The prototype is in the format of a moving.Trajectory: it could be | 86 The prototype is in the format of a moving.Trajectory: it could be |
| 57 1. an observed trajectory (extracted from video) | 87 1. an observed trajectory (extracted from video) |
| 58 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | 88 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow |
| 153 def computeExpectedIndicator(points): | 183 def computeExpectedIndicator(points): |
| 154 return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 184 return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) |
| 155 | 185 |
| 156 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | 186 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): |
| 157 '''Computes the first instant | 187 '''Computes the first instant |
| 158 at which two predicted trajectories are within some distance threshold | 188 at which two predicted trajectories are colliding (depending on the trajectory format) |
| 159 Computes all the times including timeHorizon | 189 Computes all the times including timeHorizon |
| 160 | 190 |
| 161 User has to check the first variable collision to know about a collision''' | 191 User has to check the first variable collision to know about a collision''' |
| 162 t = 1 | 192 t = 1 |
| 163 p1 = predictedTrajectory1.predictPosition(t) | 193 collision, p1, p2 = predictedTrajectory1.hasCollisionWith(predictedTrajectory2, t, collisionDistanceThreshold) |
| 164 p2 = predictedTrajectory2.predictPosition(t) | |
| 165 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
| 166 while t < timeHorizon and not collision: | 194 while t < timeHorizon and not collision: |
| 167 t += 1 | 195 t += 1 |
| 168 p1 = predictedTrajectory1.predictPosition(t) | 196 collision, p1, p2 = predictedTrajectory1.hasCollisionWith(predictedTrajectory2, t, collisionDistanceThreshold) |
| 169 p2 = predictedTrajectory2.predictPosition(t) | |
| 170 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
| 171 return collision, t, p1, p2 | 197 return collision, t, p1, p2 |
| 172 | 198 |
| 173 def computeCollisionTimePolygon(predictedTrajectories1, predictedTrajectories2, timeHorizon): | 199 def computeCollisionTimePolygon(predictedTrajectories1, predictedTrajectories2, timeHorizon): |
| 174 '''Computes the first instant | 200 '''Computes the first instant |
| 175 at which two objects represented by a series of points (eg box) | 201 at which two objects represented by a series of points (eg box) |
| 183 while t < timeHorizon and not collision: | 209 while t < timeHorizon and not collision: |
| 184 t += 1 | 210 t += 1 |
| 185 poly1 = moving.pointsToShapely([t1.predictPosition(t) for t1 in predictedTrajectories1]) | 211 poly1 = moving.pointsToShapely([t1.predictPosition(t) for t1 in predictedTrajectories1]) |
| 186 poly2 = moving.pointsToShapely([t2.predictPosition(t) for t2 in predictedTrajectories2]) | 212 poly2 = moving.pointsToShapely([t2.predictPosition(t) for t2 in predictedTrajectories2]) |
| 187 collision = poly1.overlaps(poly2) | 213 collision = poly1.overlaps(poly2) |
| 188 return collision, t, p1, p2 | 214 return collision, t |
| 189 | 215 |
| 190 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): | 216 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): |
| 191 from matplotlib.pyplot import figure, axis, title, clf, savefig | 217 from matplotlib.pyplot import figure, axis, title, clf, savefig |
| 192 if printFigure: | 218 if printFigure: |
| 193 clf() | 219 clf() |
| 378 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | 404 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) |
| 379 | 405 |
| 380 return collisionProbabilities | 406 return collisionProbabilities |
| 381 | 407 |
| 382 class ConstantPredictionParameters(PredictionParameters): | 408 class ConstantPredictionParameters(PredictionParameters): |
| 383 def __init__(self, maxSpeed): | 409 def __init__(self): |
| 384 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | 410 PredictionParameters.__init__(self, 'constant velocity') |
| 385 | 411 |
| 386 def generatePredictedTrajectories(self, obj, instant): | 412 def generatePredictedTrajectories(self, obj, instant): |
| 387 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] | 413 return [PredictedTrajectoryConstantVelocity(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant))] |
| 388 | 414 |
| 389 class NormalAdaptationPredictionParameters(PredictionParameters): | 415 class NormalAdaptationPredictionParameters(PredictionParameters): |
| 390 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | 416 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): |
| 391 '''An example of acceleration and steering distributions is | 417 '''An example of acceleration and steering distributions is |
| 392 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) | 418 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) |
| 436 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | 462 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] |
| 437 positions = [f.getPositionAtInstant(instant) for f in features] | 463 positions = [f.getPositionAtInstant(instant) for f in features] |
| 438 velocities = [f.getVelocityAtInstant(instant) for f in features] | 464 velocities = [f.getVelocityAtInstant(instant) for f in features] |
| 439 probability = 1./float(len(positions)) | 465 probability = 1./float(len(positions)) |
| 440 for initialPosition,initialVelocity in zip(positions, velocities): | 466 for initialPosition,initialVelocity in zip(positions, velocities): |
| 441 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed)) | 467 predictedTrajectories.append(PredictedTrajectoryConstantVelocity(initialPosition, initialVelocity, probability = probability)) |
| 442 return predictedTrajectories | 468 return predictedTrajectories |
| 443 else: | 469 else: |
| 444 print('Object {} has no features'.format(obj.getNum())) | 470 print('Object {} has no features'.format(obj.getNum())) |
| 445 return None | 471 return None |
| 446 | 472 |
