Mercurial > hg > nsaunier > traffic-intelligence
comparison python/traffic_engineering.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
| author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
|---|---|
| date | Fri, 05 Dec 2014 12:13:53 -0500 |
| parents | 850ed17c7b2f |
| children | 3b13ec964476 |
comparison
equal
deleted
inserted
replaced
| 598:11f96bd08552 | 614:5e09583275a4 |
|---|---|
| 9 | 9 |
| 10 | 10 |
| 11 ######################### | 11 ######################### |
| 12 # Simulation | 12 # Simulation |
| 13 ######################### | 13 ######################### |
| 14 | |
| 15 def generateTimeHeadways(meanTimeHeadway, simulationTime): | |
| 16 '''Generates the time headways between arrivals | |
| 17 given the meanTimeHeadway and the negative exponential distribution | |
| 18 over a time interval of length simulationTime (assumed to be in same time unit as headway''' | |
| 19 from random import expovariate | |
| 20 headways = [] | |
| 21 totalTime = 0 | |
| 22 flow = 1/meanTimeHeadway | |
| 23 while totalTime < simulationTime: | |
| 24 h = expovariate(flow) | |
| 25 headways.append(h) | |
| 26 totalTime += h | |
| 27 return headways | |
| 14 | 28 |
| 15 class Vehicle: | 29 class Vehicle: |
| 16 '''Generic vehicle class | 30 '''Generic vehicle class |
| 17 1D coordinates for now''' | 31 1D coordinates for now''' |
| 18 class PredictedTrajectory1D(prediction.PredictedTrajectory): | 32 class PredictedTrajectory1D(prediction.PredictedTrajectory): |
| 157 if sum(proportions) == 1: | 171 if sum(proportions) == 1: |
| 158 self.volume = volume | 172 self.volume = volume |
| 159 self.types = types | 173 self.types = types |
| 160 self.proportions = proportions | 174 self.proportions = proportions |
| 161 self.equivalents = equivalents | 175 self.equivalents = equivalents |
| 162 self.nLanes = nLanes # unused | 176 self.nLanes = nLanes |
| 163 else: | 177 else: |
| 164 print('Proportions do not sum to 1') | 178 print('Proportions do not sum to 1') |
| 165 pass | 179 pass |
| 180 | |
| 181 def checkProtected(self, opposedThroughMvt): | |
| 182 '''Checks if this left movement should be protected, | |
| 183 ie if one of the main two conditions on left turn is verified''' | |
| 184 return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
| 166 | 185 |
| 167 def getPCUVolume(self): | 186 def getPCUVolume(self): |
| 168 '''Returns the passenger-car equivalent for the input volume''' | 187 '''Returns the passenger-car equivalent for the input volume''' |
| 169 v = 0 | 188 v = 0 |
| 170 for p, e in zip(self.proportions, self.equivalents): | 189 for p, e in zip(self.proportions, self.equivalents): |
| 180 self.mvtEquivalent = mvtEquivalent | 199 self.mvtEquivalent = mvtEquivalent |
| 181 | 200 |
| 182 def getTVUVolume(self): | 201 def getTVUVolume(self): |
| 183 return self.mvtEquivalent*self.volume.getPCUVolume() | 202 return self.mvtEquivalent*self.volume.getPCUVolume() |
| 184 | 203 |
| 185 class IntersectionApproach: # should probably not be used | |
| 186 def __init__(self, leftTurnVolume, throughVolume, rightTurnVolume): | |
| 187 self.leftTurnVolume = leftTurnVolume | |
| 188 self.throughVolume = throughVolume | |
| 189 self.rightTurnVolume = rightTurnVolume | |
| 190 | |
| 191 def getTVUVolume(self, leftTurnEquivalent = 1, throughEquivalent = 1, rightTurnEquivalent = 1): | |
| 192 return self.leftTurnVolume.getPCUVolume()*leftTurnEquivalent+self.throughVolume.getPCUVolume()*throughEquivalent+self.rightTurnVolume.getPCUVolume()*rightTurnEquivalent | |
| 193 | |
| 194 class LaneGroup: | 204 class LaneGroup: |
| 195 '''Class that represents a group of mouvements''' | 205 '''Class that represents a group of mouvements''' |
| 196 | 206 |
| 197 def __init__(self, movements, nLanes): | 207 def __init__(self, movements, nLanes): |
| 198 self.movements = movements | 208 self.movements = movements |
| 201 def getTVUVolume(self): | 211 def getTVUVolume(self): |
| 202 return sum([mvt.getTVUVolume() for mvt in self.movements]) | 212 return sum([mvt.getTVUVolume() for mvt in self.movements]) |
| 203 | 213 |
| 204 def getCharge(self, saturationVolume): | 214 def getCharge(self, saturationVolume): |
| 205 return self.getTVUVolume()/(self.nLanes*saturationVolume) | 215 return self.getTVUVolume()/(self.nLanes*saturationVolume) |
| 206 | |
| 207 def checkProtectedLeftTurn(leftMvt, opposedThroughMvt): | |
| 208 '''Checks if one of the main two conditions on left turn is verified | |
| 209 The lane groups should contain left and through movement''' | |
| 210 return leftMvt.volume >= 200 or leftMvt.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
| 211 | 216 |
| 212 def optimalCycle(lostTime, criticalCharge): | 217 def optimalCycle(lostTime, criticalCharge): |
| 213 return (1.5*lostTime+5)/(1-criticalCharge) | 218 return (1.5*lostTime+5)/(1-criticalCharge) |
| 214 | 219 |
| 215 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): | 220 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): |
| 224 self.phases = phases | 229 self.phases = phases |
| 225 self.lostTime = lostTime | 230 self.lostTime = lostTime |
| 226 self.saturationVolume = saturationVolume | 231 self.saturationVolume = saturationVolume |
| 227 | 232 |
| 228 def computeCriticalCharges(self): | 233 def computeCriticalCharges(self): |
| 229 self.criticalCharges = [] | 234 self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] |
| 230 for phase in self.phases: | |
| 231 self.criticalCharges.append(max([lg.getCharge(self.saturationVolume) for lg in phase])) | |
| 232 self.criticalCharge = sum(self.criticalCharges) | 235 self.criticalCharge = sum(self.criticalCharges) |
| 233 | 236 |
| 234 def computeOptimalCycle(self): | 237 def computeOptimalCycle(self): |
| 235 self.computeCriticalCharges() | 238 self.computeCriticalCharges() |
| 236 self.C = optimalCycle(self.lostTime, self.criticalCharge) | 239 self.C = optimalCycle(self.lostTime, self.criticalCharge) |
| 240 self.computeCriticalCharges() | 243 self.computeCriticalCharges() |
| 241 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) | 244 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) |
| 242 return self.C | 245 return self.C |
| 243 | 246 |
| 244 def computeEffectiveGreen(self): | 247 def computeEffectiveGreen(self): |
| 245 from numpy import round | 248 #from numpy import round |
| 246 #self.computeCycle() # in case it was not done before | 249 #self.computeCycle() # in case it was not done before |
| 247 effectiveGreenTime = self.C-self.lostTime | 250 effectiveGreenTime = self.C-self.lostTime |
| 248 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] | 251 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] |
| 249 return self.effectiveGreens | 252 return self.effectiveGreens |
| 250 | 253 |
| 261 | 264 |
| 262 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): | 265 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): |
| 263 '''Computes the uniform delay''' | 266 '''Computes the uniform delay''' |
| 264 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) | 267 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) |
| 265 | 268 |
| 269 def overflowDelay(T, X, c, k=0.5, I=1): | |
| 270 '''Computes the overflow delay (HCM) | |
| 271 T in hours | |
| 272 c capacity of the lane group | |
| 273 k default for fixed time signal | |
| 274 I=1 for isolated intersection (Poisson arrival)''' | |
| 275 from math import sqrt | |
| 276 return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) | |
| 277 | |
| 266 ######################### | 278 ######################### |
| 267 # misc | 279 # misc |
| 268 ######################### | 280 ######################### |
| 269 | 281 |
| 270 def timeChangingSpeed(v0, vf, a, TPR): | 282 def timeChangingSpeed(v0, vf, a, TPR): |
