Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/traffic_engineering.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 15 Jun 2018 11:19:10 -0400 |
| parents | python/traffic_engineering.py@2cd1ce245024 |
| children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
| 1027:6129296848d3 | 1028:cc5cb04b04b0 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 ''' Traffic Engineering Tools and Examples''' | |
| 3 | |
| 4 from trafficintelligence import prediction | |
| 5 | |
| 6 from math import ceil | |
| 7 | |
| 8 | |
| 9 ######################### | |
| 10 # Simulation | |
| 11 ######################### | |
| 12 | |
| 13 def generateTimeHeadways(meanTimeHeadway, simulationTime): | |
| 14 '''Generates the time headways between arrivals | |
| 15 given the meanTimeHeadway and the negative exponential distribution | |
| 16 over a time interval of length simulationTime (assumed to be in same time unit as headway''' | |
| 17 from random import expovariate | |
| 18 headways = [] | |
| 19 totalTime = 0 | |
| 20 flow = 1/meanTimeHeadway | |
| 21 while totalTime < simulationTime: | |
| 22 h = expovariate(flow) | |
| 23 headways.append(h) | |
| 24 totalTime += h | |
| 25 return headways | |
| 26 | |
| 27 class RoadUser(object): | |
| 28 '''Simple example of inheritance to plot different road users ''' | |
| 29 def __init__(self, position, velocity): | |
| 30 'Both fields are 2D numpy arrays' | |
| 31 self.position = position.astype(float) | |
| 32 self.velocity = velocity.astype(float) | |
| 33 | |
| 34 def move(self, deltaT): | |
| 35 self.position += deltaT*self.velocity | |
| 36 | |
| 37 def draw(self, init = False): | |
| 38 from matplotlib.pyplot import plot | |
| 39 if init: | |
| 40 self.plotLine = plot(self.position[0], self.position[1], self.getDescriptor())[0] | |
| 41 else: | |
| 42 self.plotLine.set_data(self.position[0], self.position[1]) | |
| 43 | |
| 44 | |
| 45 class PassengerVehicle(RoadUser): | |
| 46 def getDescriptor(self): | |
| 47 return 'dr' | |
| 48 | |
| 49 class Pedestrian(RoadUser): | |
| 50 def getDescriptor(self): | |
| 51 return 'xb' | |
| 52 | |
| 53 class Cyclist(RoadUser): | |
| 54 def getDescriptor(self): | |
| 55 return 'og' | |
| 56 | |
| 57 ######################### | |
| 58 # queueing models | |
| 59 ######################### | |
| 60 | |
| 61 class CapacityReduction(object): | |
| 62 def __init__(self, beta, reductionDuration, demandCapacityRatio = None, demand = None, capacity = None): | |
| 63 '''reduction duration should be positive | |
| 64 demandCapacityRatio is demand/capacity (q/s)''' | |
| 65 if demandCapacityRatio is None and demand is None and capacity is None: | |
| 66 print('Missing too much information (demand, capacity and ratio)') | |
| 67 import sys | |
| 68 sys.exit() | |
| 69 if 0 <= beta < 1: | |
| 70 self.beta = beta | |
| 71 self.reductionDuration = reductionDuration | |
| 72 | |
| 73 if demandCapacityRatio is not None: | |
| 74 self.demandCapacityRatio = demandCapacityRatio | |
| 75 if demand is not None: | |
| 76 self.demand = demand | |
| 77 if capacity is not None: | |
| 78 self.capacity = capacity | |
| 79 if capacity is not None and demand is not None: | |
| 80 self.demandCapacityRatio = float(self.demand)/self.capacity | |
| 81 if demand <= beta*capacity: | |
| 82 print('There is no queueing as the demand {} is inferior to the reduced capacity {}'.format(demand, beta*capacity)) | |
| 83 else: | |
| 84 print('reduction coefficient (beta={}) is not in [0, 1['.format(beta)) | |
| 85 | |
| 86 def queueingDuration(self): | |
| 87 return self.reductionDuration*(1-self.beta)/(1-self.demandCapacityRatio) | |
| 88 | |
| 89 def nArrived(self, t): | |
| 90 if self.demand is None: | |
| 91 print('Missing demand field') | |
| 92 return None | |
| 93 return self.demand*t | |
| 94 | |
| 95 def nServed(self, t): | |
| 96 if self.capacity is None: | |
| 97 print('Missing capacity field') | |
| 98 return None | |
| 99 if 0<=t<=self.reductionDuration: | |
| 100 return self.beta*self.capacity*t | |
| 101 elif self.reductionDuration < t <= self.queueingDuration(): | |
| 102 return self.beta*self.capacity*self.reductionDuration+self.capacity*(t-self.reductionDuration) | |
| 103 | |
| 104 def nQueued(self, t): | |
| 105 return self.nArrived(t)-self.nServed(t) | |
| 106 | |
| 107 def maxNQueued(self): | |
| 108 return self.nQueued(self.reductionDuration) | |
| 109 | |
| 110 def totalDelay(self): | |
| 111 if self.capacity is None: | |
| 112 print('Missing capacity field') | |
| 113 return None | |
| 114 return self.capacity*self.reductionDuration**2*(1-self.beta)*(self.demandCapacityRatio-self.beta)/(2*(1-self.demandCapacityRatio)) | |
| 115 | |
| 116 def averageDelay(self): | |
| 117 return self.reductionDuration*(self.demandCapacityRatio-self.beta)/(2*self.demandCapacityRatio) | |
| 118 | |
| 119 def averageNQueued(self): | |
| 120 return self.totalDelay()/self.queueingDuration() | |
| 121 | |
| 122 | |
| 123 ######################### | |
| 124 # fundamental diagram | |
| 125 ######################### | |
| 126 | |
| 127 class FundamentalDiagram(object): | |
| 128 ''' ''' | |
| 129 def __init__(self, name): | |
| 130 self.name = name | |
| 131 | |
| 132 def q(self, k): | |
| 133 return k*self.v(k) | |
| 134 | |
| 135 @staticmethod | |
| 136 def meanHeadway(k): | |
| 137 return 1/k | |
| 138 | |
| 139 @staticmethod | |
| 140 def meanSpacing(q): | |
| 141 return 1/q | |
| 142 | |
| 143 def plotVK(self, language='fr', units={}): | |
| 144 from numpy import arange | |
| 145 from matplotlib.pyplot import figure,plot,xlabel,ylabel | |
| 146 densities = [k for k in arange(1, self.kj+1)] | |
| 147 figure() | |
| 148 plot(densities, [self.v(k) for k in densities]) | |
| 149 xlabel('Densite (veh/km)') # todo other languages and adapt to units | |
| 150 ylabel('Vitesse (km/h)') | |
| 151 | |
| 152 def plotQK(self, language='fr', units={}): | |
| 153 from numpy import arange | |
| 154 from matplotlib.pyplot import figure,plot,xlabel,ylabel | |
| 155 densities = [k for k in arange(1, self.kj+1)] | |
| 156 figure() | |
| 157 plot(densities, [self.q(k) for k in densities]) | |
| 158 xlabel('Densite (veh/km)') # todo other languages and adapt to units | |
| 159 ylabel('Debit (km/h)') | |
| 160 | |
| 161 class GreenbergFD(FundamentalDiagram): | |
| 162 '''Speed is the logarithm of density''' | |
| 163 def __init__(self, vc, kj): | |
| 164 FundamentalDiagram.__init__(self,'Greenberg') | |
| 165 self.vc=vc | |
| 166 self.kj=kj | |
| 167 | |
| 168 def v(self,k): | |
| 169 from numpy import log | |
| 170 return self.vc*log(self.kj/k) | |
| 171 | |
| 172 def criticalDensity(self): | |
| 173 from numpy import e | |
| 174 self.kc = self.kj/e | |
| 175 return self.kc | |
| 176 | |
| 177 def capacity(self): | |
| 178 self.qmax = self.kc*self.vc | |
| 179 return self.qmax | |
| 180 | |
| 181 ######################### | |
| 182 # intersection | |
| 183 ######################### | |
| 184 | |
| 185 class FourWayIntersection(object): | |
| 186 '''Simple class for simple intersection outline''' | |
| 187 def __init__(self, dimension, coordX, coordY): | |
| 188 self.dimension = dimension | |
| 189 self.coordX = coordX | |
| 190 self.coordY = coordY | |
| 191 | |
| 192 def plot(self, options = 'k'): | |
| 193 from matplotlib.pyplot import plot, axis | |
| 194 | |
| 195 minX = min(self.dimension[0]) | |
| 196 maxX = max(self.dimension[0]) | |
| 197 minY = min(self.dimension[1]) | |
| 198 maxY = max(self.dimension[1]) | |
| 199 | |
| 200 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) | |
| 201 plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) | |
| 202 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) | |
| 203 plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) | |
| 204 axis('equal') | |
| 205 | |
| 206 ######################### | |
| 207 # traffic signals | |
| 208 ######################### | |
| 209 | |
| 210 class Volume(object): | |
| 211 '''Class to represent volumes with varied vehicule types ''' | |
| 212 def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): | |
| 213 '''mvtEquivalent is the equivalent if the movement is right of left turn''' | |
| 214 | |
| 215 # check the sizes of the lists | |
| 216 if sum(proportions) == 1: | |
| 217 self.volume = volume | |
| 218 self.types = types | |
| 219 self.proportions = proportions | |
| 220 self.equivalents = equivalents | |
| 221 self.nLanes = nLanes | |
| 222 else: | |
| 223 print('Proportions do not sum to 1') | |
| 224 pass | |
| 225 | |
| 226 def checkProtected(self, opposedThroughMvt): | |
| 227 '''Checks if this left movement should be protected, | |
| 228 ie if one of the main two conditions on left turn is verified''' | |
| 229 return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
| 230 | |
| 231 def getPCUVolume(self): | |
| 232 '''Returns the passenger-car equivalent for the input volume''' | |
| 233 v = 0 | |
| 234 for p, e in zip(self.proportions, self.equivalents): | |
| 235 v += p*e | |
| 236 return v*self.volume | |
| 237 | |
| 238 class IntersectionMovement(object): | |
| 239 '''Represents an intersection movement | |
| 240 with a volume, a type (through, left or right) | |
| 241 and an equivalent for movement type''' | |
| 242 def __init__(self, volume, mvtEquivalent = 1): | |
| 243 self.volume = volume | |
| 244 self.mvtEquivalent = mvtEquivalent | |
| 245 | |
| 246 def getTVUVolume(self): | |
| 247 return self.mvtEquivalent*self.volume.getPCUVolume() | |
| 248 | |
| 249 class LaneGroup(object): | |
| 250 '''Class that represents a group of mouvements''' | |
| 251 | |
| 252 def __init__(self, movements, nLanes): | |
| 253 self.movements = movements | |
| 254 self.nLanes = nLanes | |
| 255 | |
| 256 def getTVUVolume(self): | |
| 257 return sum([mvt.getTVUVolume() for mvt in self.movements]) | |
| 258 | |
| 259 def getCharge(self, saturationVolume): | |
| 260 return self.getTVUVolume()/(self.nLanes*saturationVolume) | |
| 261 | |
| 262 def optimalCycle(lostTime, criticalCharge): | |
| 263 return (1.5*lostTime+5)/(1-criticalCharge) | |
| 264 | |
| 265 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): | |
| 266 'degree of saturation can be used as the peak hour factor too' | |
| 267 return lostTime/(1-criticalCharge/degreeSaturation) | |
| 268 | |
| 269 class Cycle(object): | |
| 270 '''Class to compute optimal cycle and the split of effective green times''' | |
| 271 def __init__(self, phases, lostTime, saturationVolume): | |
| 272 '''phases is a list of phases | |
| 273 a phase is a list of lanegroups''' | |
| 274 self.phases = phases | |
| 275 self.lostTime = lostTime | |
| 276 self.saturationVolume = saturationVolume | |
| 277 | |
| 278 def computeCriticalCharges(self): | |
| 279 self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] | |
| 280 self.criticalCharge = sum(self.criticalCharges) | |
| 281 | |
| 282 def computeOptimalCycle(self): | |
| 283 self.computeCriticalCharges() | |
| 284 self.C = optimalCycle(self.lostTime, self.criticalCharge) | |
| 285 return self.C | |
| 286 | |
| 287 def computeMinimumCycle(self, degreeSaturation=1.): | |
| 288 self.computeCriticalCharges() | |
| 289 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) | |
| 290 return self.C | |
| 291 | |
| 292 def computeEffectiveGreen(self): | |
| 293 #from numpy import round | |
| 294 #self.computeCycle() # in case it was not done before | |
| 295 effectiveGreenTime = self.C-self.lostTime | |
| 296 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] | |
| 297 return self.effectiveGreens | |
| 298 | |
| 299 | |
| 300 def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): | |
| 301 '''Computes the intergreen time (yellow/amber plus all red time) | |
| 302 Deceleration is positive | |
| 303 All variables should be in the same units''' | |
| 304 if deceleration > 0: | |
| 305 return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] | |
| 306 else: | |
| 307 print('Issue deceleration should be strictly positive') | |
| 308 return None | |
| 309 | |
| 310 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): | |
| 311 '''Computes the uniform delay''' | |
| 312 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)**2/(1-float(effectiveGreen*saturationDegree)/cycleLength) | |
| 313 | |
| 314 def randomDelay(volume, saturationDegree): | |
| 315 '''Computes the random delay = queueing time for M/D/1''' | |
| 316 return saturationDegree**2/(2*volume*(1-saturationDegree)) | |
| 317 | |
| 318 def incrementalDelay(T, X, c, k=0.5, I=1): | |
| 319 '''Computes the incremental delay (HCM) | |
| 320 T in hours | |
| 321 c capacity of the lane group | |
| 322 k default for fixed time signal | |
| 323 I=1 for isolated intersection (Poisson arrival)''' | |
| 324 from math import sqrt | |
| 325 return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) | |
| 326 | |
| 327 ######################### | |
| 328 # misc | |
| 329 ######################### | |
| 330 | |
| 331 def timeChangingSpeed(v0, vf, a, TPR): | |
| 332 'for decelerations, a < 0' | |
| 333 return TPR-(vf-v0)/a | |
| 334 | |
| 335 def distanceChangingSpeed(v0, vf, a, TPR): | |
| 336 'for decelerations, a < 0' | |
| 337 return TPR*v0-(vf**2-v0**2)/(2*a) |
