Mercurial > hg > nsaunier > traffic-intelligence
comparison python/extrapolation.py @ 268:0c0b92f621f6
reorganized to compute evasive action for multiple positions
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Sat, 28 Jul 2012 02:58:47 -0400 |
| parents | 32e88b513f5c |
| children | a9988971aac8 |
comparison
equal
deleted
inserted
replaced
| 267:32e88b513f5c | 268:0c0b92f621f6 |
|---|---|
| 1 #! /usr/bin/env python | 1 #! /usr/bin/env python |
| 2 '''Library for moving object extrapolation hypotheses''' | 2 '''Library for moving object extrapolation hypotheses''' |
| 3 | 3 |
| 4 import moving | 4 import moving |
| 5 import math | 5 import math |
| 6 import random | |
| 6 | 7 |
| 7 class ExtrapolatedTrajectory: | 8 class ExtrapolatedTrajectory: |
| 8 '''Class for extrapolated trajectories with lazy evaluation | 9 '''Class for extrapolated trajectories with lazy evaluation |
| 9 if the predicted position has not been already computed, compute it | 10 if the predicted position has not been already computed, compute it |
| 10 | 11 |
| 68 self.maxSpeed = maxSpeed | 69 self.maxSpeed = maxSpeed |
| 69 | 70 |
| 70 def __str__(self): | 71 def __str__(self): |
| 71 return '{0} {1}'.format(self.name, self.maxSpeed) | 72 return '{0} {1}'.format(self.name, self.maxSpeed) |
| 72 | 73 |
| 73 # refactor with classes and subclasses | 74 class ConstantExtrapolationParameters(ExtrapolationParameters): |
| 74 def createExtrapolatedTrajectories(self, obj, instant): | 75 def __init__(self, maxSpeed): |
| 75 '''extrapolationParameters specific to each method (in name field) ''' | 76 ExtrapolationParameters.__init__(self, 'constant velocity', maxSpeed) |
| 77 | |
| 78 def generateExtrapolatedTrajectories(self, obj, instant): | |
| 79 return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
| 80 maxSpeed = self.maxSpeed)] | |
| 81 | |
| 82 class NormalAdaptationExtrapolationParameters(ExtrapolationParameters): | |
| 83 def __init__(self, maxSpeed, nExtrapolatedTrajectories, maxAcceleration, maxSteering): | |
| 84 ExtrapolationParameters.__init__(self, 'normal adaptation', maxSpeed) | |
| 85 self.nExtrapolatedTrajectories = nExtrapolatedTrajectories | |
| 86 self.maxAcceleration = maxAcceleration | |
| 87 self.maxSteering = maxSteering | |
| 88 self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, | |
| 89 self.maxAcceleration, 0.) | |
| 90 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 91 self.maxSteering, 0.) | |
| 92 | |
| 93 def __str__(self): | |
| 94 return ExtrapolationParameters.__str__(self)+' {0} {1} {2}'.format(self.nExtrapolatedTrajectories, | |
| 95 self.maxAcceleration, | |
| 96 self.maxSteering) | |
| 97 | |
| 98 def generateExtrapolatedTrajectories(self, obj, instant): | |
| 76 extrapolatedTrajectories = [] | 99 extrapolatedTrajectories = [] |
| 77 if self.name == 'constant velocity': | 100 for i in xrange(self.nExtrapolatedTrajectories): |
| 78 extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | 101 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), |
| 79 maxSpeed = self.maxSpeed)] | 102 obj.getVelocityAtInstant(instant), |
| 80 elif self.name == 'normal adaptation': | 103 self.accelerationDistribution, |
| 81 for i in xrange(self.nExtrapolatedTrajectories): | 104 self.steeringDistribution, |
| 82 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | 105 maxSpeed = self.maxSpeed)) |
| 83 obj.getVelocityAtInstant(instant), | 106 return extrapolatedTrajectories |
| 84 self.accelerationDistribution, | 107 |
| 85 self.steeringDistribution, | 108 class PointSetExtrapolationParameters(ExtrapolationParameters): |
| 86 maxSpeed = self.maxSpeed)) | 109 def __init__(self, maxSpeed): |
| 87 elif self.name == 'pointset': | 110 ExtrapolationParameters.__init__(self, 'point set', maxSpeed) |
| 111 | |
| 112 def generateExtrapolatedTrajectories(self, obj, instant): | |
| 113 extrapolatedTrajectories = [] | |
| 114 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
| 115 positions = [f.getPositionAtInstant(instant) for f in features] | |
| 116 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
| 117 for initialPosition,initialVelocity in zip(positions, velocities): | |
| 118 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
| 119 maxSpeed = self.maxSpeed)) | |
| 120 return extrapolatedTrajectories | |
| 121 | |
| 122 class EvasiveActionExtrapolationParameters(ExtrapolationParameters): | |
| 123 def __init__(self, maxSpeed, nExtrapolatedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): | |
| 124 if useFeatures: | |
| 125 name = 'point set evasive action' | |
| 126 else: | |
| 127 name = 'evasive action' | |
| 128 ExtrapolationParameters.__init__(self, name, maxSpeed) | |
| 129 self.nExtrapolatedTrajectories = nExtrapolatedTrajectories | |
| 130 self.minAcceleration = minAcceleration | |
| 131 self.maxAcceleration = maxAcceleration | |
| 132 self.maxSteering = maxSteering | |
| 133 self.useFeatures = useFeatures | |
| 134 self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, | |
| 135 self.maxAcceleration, 0.) | |
| 136 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
| 137 self.maxSteering, 0.) | |
| 138 | |
| 139 def __str__(self): | |
| 140 return ExtrapolationParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nExtrapolatedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
| 141 | |
| 142 def generateExtrapolatedTrajectories(self, obj, instant): | |
| 143 extrapolatedTrajectories = [] | |
| 144 if self.useFeatures: | |
| 88 features = [f for f in obj.features if f.existsAtInstant(instant)] | 145 features = [f for f in obj.features if f.existsAtInstant(instant)] |
| 89 positions = [f.getPositionAtInstant(instant) for f in features] | 146 positions = [f.getPositionAtInstant(instant) for f in features] |
| 90 velocities = [f.getVelocityAtInstant(instant) for f in features] | 147 velocities = [f.getVelocityAtInstant(instant) for f in features] |
| 148 else: | |
| 149 positions = [obj.getPositionAtInstant(instant)] | |
| 150 velocities = [obj.getVelocityAtInstant(instant)] | |
| 151 for i in xrange(self.nExtrapolatedTrajectories): | |
| 91 for initialPosition,initialVelocity in zip(positions, velocities): | 152 for initialPosition,initialVelocity in zip(positions, velocities): |
| 92 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | 153 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, |
| 154 initialVelocity, | |
| 155 moving.NormAngle(self.accelerationDistribution(), | |
| 156 self.steeringDistribution()), | |
| 93 maxSpeed = self.maxSpeed)) | 157 maxSpeed = self.maxSpeed)) |
| 94 elif self.name == 'evasive action': | |
| 95 for i in xrange(self.nExtrapolatedTrajectories): | |
| 96 | |
| 97 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), | |
| 98 obj.getVelocityAtInstant(instant), | |
| 99 moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), | |
| 100 maxSpeed = self.maxSpeed)) | |
| 101 else: | |
| 102 print('Unknown extrapolation hypothesis') | |
| 103 return extrapolatedTrajectories | 158 return extrapolatedTrajectories |
| 104 | 159 |
| 105 class SafetyPoint(moving.Point): | 160 class SafetyPoint(moving.Point): |
| 106 '''Can represent a collision point or crossing zone | 161 '''Can represent a collision point or crossing zone |
| 107 with respective safety indicator, TTC or pPET''' | 162 with respective safety indicator, TTC or pPET''' |
| 130 t += 1 | 185 t += 1 |
| 131 return t, p1, p2 | 186 return t, p1, p2 |
| 132 | 187 |
| 133 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): | 188 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): |
| 134 '''returns the lists of collision points and crossing zones ''' | 189 '''returns the lists of collision points and crossing zones ''' |
| 135 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | 190 extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) |
| 136 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | 191 extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) |
| 137 | 192 |
| 138 collisionPoints = [] | 193 collisionPoints = [] |
| 139 crossingZones = [] | 194 crossingZones = [] |
| 140 for et1 in extrapolatedTrajectories1: | 195 for et1 in extrapolatedTrajectories1: |
| 141 for et2 in extrapolatedTrajectories2: | 196 for et2 in extrapolatedTrajectories2: |
| 162 return collisionPoints, crossingZones | 217 return collisionPoints, crossingZones |
| 163 | 218 |
| 164 def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): | 219 def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): |
| 165 collisionPoints={} | 220 collisionPoints={} |
| 166 crossingZones={} | 221 crossingZones={} |
| 167 #TTCs = {} | |
| 168 #pPETs = {} | |
| 169 if timeInterval: | 222 if timeInterval: |
| 170 commonTimeInterval = timeInterval | 223 commonTimeInterval = timeInterval |
| 171 else: | 224 else: |
| 172 commonTimeInterval = obj1.commonTimeInterval(obj2) | 225 commonTimeInterval = obj1.commonTimeInterval(obj2) |
| 173 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 226 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
| 174 print(obj1.num, obj2.num, i) | 227 print(obj1.num, obj2.num, i) |
| 175 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) | 228 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) |
| 176 # if len(collisionPoints[i]) > 0: | |
| 177 # TTCs[i] = extrapolation.computeExpectedIndicator(collisionPoints[i]) | |
| 178 # if len(crossingZones[i]) > 0: | |
| 179 # pPETs[i] = extrapolation.computeExpectedIndicator(crossingZones[i]) | |
| 180 SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) | 229 SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) |
| 181 SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) | 230 SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) |
| 182 | 231 |
| 183 if debug: | 232 if debug: |
| 184 from matplotlib.pyplot import figure, axis, title | 233 from matplotlib.pyplot import figure, axis, title |
| 195 title('instant {0}'.format(i)) | 244 title('instant {0}'.format(i)) |
| 196 axis('equal') | 245 axis('equal') |
| 197 | 246 |
| 198 return collisionPoints, crossingZones | 247 return collisionPoints, crossingZones |
| 199 | 248 |
| 200 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): | 249 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): |
| 201 collisionProbabilities = {} | 250 collisionProbabilities = {} |
| 202 for i in list(obj1.commonTimeInterval(obj2))[:-1]: | 251 if timeInterval: |
| 252 commonTimeInterval = timeInterval | |
| 253 else: | |
| 254 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
| 255 for i in list(commonTimeInterval)[:-1]: | |
| 203 nCollisions = 0 | 256 nCollisions = 0 |
| 204 print(obj1.num, obj2.num, i) | 257 print(obj1.num, obj2.num, i) |
| 205 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | 258 extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) |
| 206 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | 259 extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) |
| 207 for et1 in extrapolatedTrajectories1: | 260 for et1 in extrapolatedTrajectories1: |
| 208 for et2 in extrapolatedTrajectories2: | 261 for et2 in extrapolatedTrajectories2: |
| 209 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | 262 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) |
| 210 if t <= timeHorizon: | 263 if t <= timeHorizon: |
| 211 nCollisions += 1 | 264 nCollisions += 1 |
| 212 # take into account probabilities ?? | 265 # take into account probabilities ?? |
| 213 collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 | 266 nSamples = float(len(extrapolatedTrajectories1)*len(extrapolatedTrajectories2)) |
| 214 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) | 267 collisionProbabilities[i] = float(nCollisions)/nSamples |
| 268 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) | |
| 215 | 269 |
| 216 if debug: | 270 if debug: |
| 217 from matplotlib.pyplot import figure, axis, title | 271 from matplotlib.pyplot import figure, axis, title |
| 218 figure() | 272 figure() |
| 219 obj1.draw('r') | 273 obj1.draw('r') |
