Mercurial > hg > nsaunier > traffic-intelligence
comparison python/extrapolation.py @ 267:32e88b513f5c
added code to compute probability of collision
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 27 Jul 2012 20:32:14 -0400 |
| parents | aba9711b3149 |
| children | 0c0b92f621f6 |
comparison
equal
deleted
inserted
replaced
| 266:aba9711b3149 | 267:32e88b513f5c |
|---|---|
| 68 self.maxSpeed = maxSpeed | 68 self.maxSpeed = maxSpeed |
| 69 | 69 |
| 70 def __str__(self): | 70 def __str__(self): |
| 71 return '{0} {1}'.format(self.name, self.maxSpeed) | 71 return '{0} {1}'.format(self.name, self.maxSpeed) |
| 72 | 72 |
| 73 def createExtrapolatedTrajectories(extrapolationParameters, obj, instant): | 73 # refactor with classes and subclasses |
| 74 '''extrapolationParameters specific to each method (in name field) ''' | 74 def createExtrapolatedTrajectories(self, obj, instant): |
| 75 if extrapolationParameters.name == 'constant velocity': | 75 '''extrapolationParameters specific to each method (in name field) ''' |
| 76 return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
| 77 maxSpeed = extrapolationParameters.maxSpeed)] | |
| 78 elif extrapolationParameters.name == 'normal adaptation': | |
| 79 extrapolatedTrajectories = [] | 76 extrapolatedTrajectories = [] |
| 80 for i in xrange(extrapolationParameters.nExtrapolatedTrajectories): | 77 if self.name == 'constant velocity': |
| 81 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | 78 extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), |
| 82 obj.getVelocityAtInstant(instant), | 79 maxSpeed = self.maxSpeed)] |
| 83 extrapolationParameters.accelerationDistribution, | 80 elif self.name == 'normal adaptation': |
| 84 extrapolationParameters.steeringDistribution, | 81 for i in xrange(self.nExtrapolatedTrajectories): |
| 85 maxSpeed = extrapolationParameters.maxSpeed)) | 82 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), |
| 83 obj.getVelocityAtInstant(instant), | |
| 84 self.accelerationDistribution, | |
| 85 self.steeringDistribution, | |
| 86 maxSpeed = self.maxSpeed)) | |
| 87 elif self.name == 'pointset': | |
| 88 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 89 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 90 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 91 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 92 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
| 93 maxSpeed = self.maxSpeed)) | |
| 94 elif self.name == 'evasive action': | |
| 95 for i in xrange(self.nExtrapolatedTrajectories): | |
| 96 | |
| 97 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), | |
| 98 obj.getVelocityAtInstant(instant), | |
| 99 moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), | |
| 100 maxSpeed = self.maxSpeed)) | |
| 101 else: | |
| 102 print('Unknown extrapolation hypothesis') | |
| 86 return extrapolatedTrajectories | 103 return extrapolatedTrajectories |
| 87 elif extrapolationParameters.name == 'pointset': | |
| 88 extrapolatedTrajectories = [] | |
| 89 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 90 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 91 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 92 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 93 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
| 94 maxSpeed = extrapolationParameters.maxSpeed)) | |
| 95 return extrapolatedTrajectories | |
| 96 else: | |
| 97 print('Unknown extrapolation hypothesis') | |
| 98 return [] | |
| 99 | 104 |
| 100 class SafetyPoint(moving.Point): | 105 class SafetyPoint(moving.Point): |
| 101 '''Can represent a collision point or crossing zone | 106 '''Can represent a collision point or crossing zone |
| 102 with respective safety indicator, TTC or pPET''' | 107 with respective safety indicator, TTC or pPET''' |
| 103 def __init__(self, p, probability = 1., indicator = -1): | 108 def __init__(self, p, probability = 1., indicator = -1): |
| 113 | 118 |
| 114 def computeExpectedIndicator(points): | 119 def computeExpectedIndicator(points): |
| 115 from numpy import sum | 120 from numpy import sum |
| 116 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 121 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) |
| 117 | 122 |
| 123 def computeCollisionTime(extrapolatedTrajectory1, extrapolatedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
| 124 t = 1 | |
| 125 p1 = extrapolatedTrajectory1.predictPosition(t) | |
| 126 p2 = extrapolatedTrajectory2.predictPosition(t) | |
| 127 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
| 128 p1 = extrapolatedTrajectory1.predictPosition(t) | |
| 129 p2 = extrapolatedTrajectory2.predictPosition(t) | |
| 130 t += 1 | |
| 131 return t, p1, p2 | |
| 132 | |
| 118 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): | 133 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): |
| 119 '''returns the lists of collision points and crossing zones ''' | 134 '''returns the lists of collision points and crossing zones ''' |
| 120 extrapolatedTrajectories1 = createExtrapolatedTrajectories(extrapolationParameters, obj1, i) | 135 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) |
| 121 extrapolatedTrajectories2 = createExtrapolatedTrajectories(extrapolationParameters, obj2, i) | 136 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) |
| 122 | 137 |
| 123 collisionPoints = [] | 138 collisionPoints = [] |
| 124 crossingZones = [] | 139 crossingZones = [] |
| 125 for et1 in extrapolatedTrajectories1: | 140 for et1 in extrapolatedTrajectories1: |
| 126 for et2 in extrapolatedTrajectories2: | 141 for et2 in extrapolatedTrajectories2: |
| 127 t = 1 | 142 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) |
| 128 p1 = et1.predictPosition(t) | 143 |
| 129 p2 = et2.predictPosition(t) | |
| 130 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
| 131 p1 = et1.predictPosition(t) | |
| 132 p2 = et2.predictPosition(t) | |
| 133 t += 1 | |
| 134 | |
| 135 if t <= timeHorizon: | 144 if t <= timeHorizon: |
| 136 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | 145 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) |
| 137 else: # check if there is a crossing zone | 146 else: # check if there is a crossing zone |
| 138 # TODO? zone should be around the points at which the traj are the closest | 147 # TODO? zone should be around the points at which the traj are the closest |
| 139 # look for CZ at different times, otherwise it would be a collision | 148 # look for CZ at different times, otherwise it would be a collision |
| 183 for et in extrapolatedTrajectories2: | 192 for et in extrapolatedTrajectories2: |
| 184 et.predictPosition(timeHorizon) | 193 et.predictPosition(timeHorizon) |
| 185 et.draw('bx') | 194 et.draw('bx') |
| 186 title('instant {0}'.format(i)) | 195 title('instant {0}'.format(i)) |
| 187 axis('equal') | 196 axis('equal') |
| 197 | |
| 198 return collisionPoints, crossingZones | |
| 199 | |
| 200 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): | |
| 201 collisionProbabilities = {} | |
| 202 for i in list(obj1.commonTimeInterval(obj2))[:-1]: | |
| 203 nCollisions = 0 | |
| 204 print(obj1.num, obj2.num, i) | |
| 205 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | |
| 206 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | |
| 207 for et1 in extrapolatedTrajectories1: | |
| 208 for et2 in extrapolatedTrajectories2: | |
| 209 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
| 210 if t <= timeHorizon: | |
| 211 nCollisions += 1 | |
| 212 # take into account probabilities ?? | |
| 213 collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 | |
| 214 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) | |
| 215 | |
| 216 if debug: | |
| 217 from matplotlib.pyplot import figure, axis, title | |
| 218 figure() | |
| 219 obj1.draw('r') | |
| 220 obj2.draw('b') | |
| 221 for et in extrapolatedTrajectories1: | |
| 222 et.predictPosition(timeHorizon) | |
| 223 et.draw('rx') | |
| 188 | 224 |
| 189 | 225 for et in extrapolatedTrajectories2: |
| 190 # probability of successful evasive action = 1-P(Collision) using larger control values | 226 et.predictPosition(timeHorizon) |
| 191 | 227 et.draw('bx') |
| 192 return collisionPoints, crossingZones | 228 title('instant {0}'.format(i)) |
| 229 axis('equal') | |
| 230 | |
| 231 return collisionProbabilities | |
| 193 | 232 |
| 194 ############### | 233 ############### |
| 195 | 234 |
| 196 # Default values: to remove because we cannot tweak that from a script where the value may be different | 235 # Default values: to remove because we cannot tweak that from a script where the value may be different |
| 197 FPS= 25 # No. of frame per second (FPS) | 236 FPS= 25 # No. of frame per second (FPS) |
