Mercurial > hg > nsaunier > traffic-intelligence
comparison python/events.py @ 299:7e5fb4abd070
renaming event to events and correcting errors in indicator computation
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Tue, 12 Feb 2013 18:08:00 -0500 |
| parents | python/event.py@1f253f218b9f |
| children | 93d851d0d21e |
comparison
equal
deleted
inserted
replaced
| 298:4a8b6a2de82f | 299:7e5fb4abd070 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 '''Libraries for events | |
| 3 Interactions, pedestrian crossing...''' | |
| 4 | |
| 5 import numpy as np | |
| 6 | |
| 7 import multiprocessing | |
| 8 import itertools | |
| 9 | |
| 10 import moving | |
| 11 import prediction | |
| 12 import indicators | |
| 13 | |
| 14 __metaclass__ = type | |
| 15 | |
| 16 class Interaction(moving.STObject): | |
| 17 '''Class for an interaction between two road users | |
| 18 or a road user and an obstacle | |
| 19 | |
| 20 link to the moving objects | |
| 21 contains the indicators in a dictionary with the names as keys | |
| 22 ''' | |
| 23 | |
| 24 categories = {'headon': 0, | |
| 25 'rearend': 1, | |
| 26 'side': 2, | |
| 27 'parallel': 3} | |
| 28 | |
| 29 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): | |
| 30 moving.STObject.__init__(self, num, timeInterval) | |
| 31 self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) | |
| 32 self.movingObject1 = movingObject1 | |
| 33 self.movingObject2 = movingObject2 | |
| 34 self.categoryNum = categoryNum | |
| 35 self.indicators = {} | |
| 36 | |
| 37 def getIndicator(self, indicatorName): | |
| 38 return self.indicators[indicatorName] | |
| 39 | |
| 40 def addIndicator(self, indicator): | |
| 41 self.indicators[indicator.name] = indicator | |
| 42 | |
| 43 def computeIndicators(self): | |
| 44 '''Computes the collision course cosine only if the cosine is positive''' | |
| 45 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) | |
| 46 collisionCourseCosines = {} | |
| 47 distances = {}#[0]*int(self.timeInterval.length()) | |
| 48 speedDifferentials = {} | |
| 49 for instant in self.timeInterval: | |
| 50 deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) | |
| 51 deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) | |
| 52 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | |
| 53 distances[instant] = deltap.norm2() | |
| 54 speedDifferentials[instant] = deltav.norm2() | |
| 55 if collisionCourseDotProducts[instant] > 0: | |
| 56 collisionCourseCosines[instant] = collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]) | |
| 57 # todo shorten the time intervals based on the interaction definition | |
| 58 # todos change cosines to angles | |
| 59 self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) | |
| 60 self.addIndicator(indicators.SeverityIndicator('Distance', distances)) | |
| 61 self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials)) | |
| 62 self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines)) | |
| 63 | |
| 64 | |
| 65 def createInteractions(objects): | |
| 66 '''Create all interactions of two co-existing road users | |
| 67 | |
| 68 todo add test to compute categories?''' | |
| 69 interactions = [] | |
| 70 num = 0 | |
| 71 for i in xrange(len(objects)): | |
| 72 for j in xrange(i): | |
| 73 commonTimeInterval = objects[i].commonTimeInterval(objects[j]) | |
| 74 if not commonTimeInterval.empty(): | |
| 75 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) | |
| 76 num += 1 | |
| 77 return interactions | |
| 78 | |
| 79 | |
| 80 # TODO: | |
| 81 #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class | |
| 82 #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 | |
| 83 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): | |
| 84 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) | |
| 85 #print pairs.num | |
| 86 # Ignore empty collision points | |
| 87 empty = 1 | |
| 88 for i in collisionPoints: | |
| 89 if(collisionPoints[i] != []): | |
| 90 empty = 0 | |
| 91 if(empty == 1): | |
| 92 pairs.hasCP = 0 | |
| 93 else: | |
| 94 pairs.hasCP = 1 | |
| 95 pairs.CP = collisionPoints | |
| 96 | |
| 97 # Ignore empty crossing zones | |
| 98 empty = 1 | |
| 99 for i in crossingZones: | |
| 100 if(crossingZones[i] != []): | |
| 101 empty = 0 | |
| 102 if(empty == 1): | |
| 103 pairs.hasCZ = 0 | |
| 104 else: | |
| 105 pairs.hasCZ = 1 | |
| 106 pairs.CZ = crossingZones | |
| 107 return pairs | |
| 108 | |
| 109 def calculateIndicatorPipe_star(a_b): | |
| 110 """Convert `f([1,2])` to `f(1,2)` call.""" | |
| 111 return calculateIndicatorPipe(*a_b) | |
| 112 | |
| 113 class VehPairs(): | |
| 114 '''Create a veh-pairs object from objects list''' | |
| 115 def __init__(self,objects): | |
| 116 self.pairs = createInteractions(objects) | |
| 117 self.interactionCount = 0 | |
| 118 self.CPcount = 0 | |
| 119 self.CZcount = 0 | |
| 120 | |
| 121 # Process indicator calculation with support for multi-threading | |
| 122 def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): | |
| 123 if(threads > 1): | |
| 124 pool = multiprocessing.Pool(threads) | |
| 125 self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) | |
| 126 pool.close() | |
| 127 else: | |
| 128 #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port | |
| 129 for j in xrange(len(self.pairs)): | |
| 130 #prog.updateAmount(j) #Removed in traffic-intelligenc port | |
| 131 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) | |
| 132 | |
| 133 # Ignore empty collision points | |
| 134 empty = 1 | |
| 135 for i in collisionPoints: | |
| 136 if(collisionPoints[i] != []): | |
| 137 empty = 0 | |
| 138 if(empty == 1): | |
| 139 self.pairs[j].hasCP = 0 | |
| 140 else: | |
| 141 self.pairs[j].hasCP = 1 | |
| 142 self.pairs[j].CP = collisionPoints | |
| 143 | |
| 144 # Ignore empty crossing zones | |
| 145 empty = 1 | |
| 146 for i in crossingZones: | |
| 147 if(crossingZones[i] != []): | |
| 148 empty = 0 | |
| 149 if(empty == 1): | |
| 150 self.pairs[j].hasCZ = 0 | |
| 151 else: | |
| 152 self.pairs[j].hasCZ = 1 | |
| 153 self.pairs[j].CZ = crossingZones | |
| 154 | |
| 155 for j in self.pairs: | |
| 156 self.interactionCount = self.interactionCount + len(j.CP) | |
| 157 self.CPcount = len(self.getCPlist()) | |
| 158 self.Czcount = len(self.getCZlist()) | |
| 159 | |
| 160 | |
| 161 def getPairsWCP(self): | |
| 162 lists = [] | |
| 163 for j in self.pairs: | |
| 164 if(j.hasCP): | |
| 165 lists.append(j.num) | |
| 166 return lists | |
| 167 | |
| 168 def getPairsWCZ(self): | |
| 169 lists = [] | |
| 170 for j in self.pairs: | |
| 171 if(j.hasCZ): | |
| 172 lists.append(j.num) | |
| 173 return lists | |
| 174 | |
| 175 def getCPlist(self,indicatorThreshold=99999): | |
| 176 lists = [] | |
| 177 for j in self.pairs: | |
| 178 if(j.hasCP): | |
| 179 for k in j.CP: | |
| 180 if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): | |
| 181 lists.append([k,j.CP[k][0]]) | |
| 182 return lists | |
| 183 | |
| 184 def getCZlist(self,indicatorThreshold=99999): | |
| 185 lists = [] | |
| 186 for j in self.pairs: | |
| 187 if(j.hasCZ): | |
| 188 for k in j.CZ: | |
| 189 if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): | |
| 190 lists.append([k,j.CZ[k][0]]) | |
| 191 return lists | |
| 192 | |
| 193 def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): | |
| 194 if(not CPlist): | |
| 195 CPlist = self.getCPlist() | |
| 196 if(not CPlist): | |
| 197 return False | |
| 198 TTC_list = [] | |
| 199 for i in CPlist: | |
| 200 TTC_list.append(i[1].indicator) | |
| 201 histo = np.histogram(TTC_list,bins=bins) | |
| 202 histo += (histo[0].astype(float)/np.sum(histo[0]),) | |
| 203 return histo | |
| 204 | |
| 205 class Crossing(moving.STObject): | |
| 206 '''Class for the event of a street crossing | |
| 207 | |
| 208 TODO: detecter passage sur la chaussee | |
| 209 identifier origines et destination (ou uniquement chaussee dans FOV) | |
| 210 carac traversee | |
| 211 detecter proximite veh (retirer si trop similaire simultanement | |
| 212 carac interaction''' | |
| 213 | |
| 214 def __init__(self, roaduserNum = None, num = None, timeInterval = None): | |
| 215 moving.STObject.__init__(self, num, timeInterval) | |
| 216 self.roaduserNum = roaduserNum | |
| 217 | |
| 218 | |
| 219 | |
| 220 if __name__ == "__main__": | |
| 221 import doctest | |
| 222 import unittest | |
| 223 #suite = doctest.DocFileSuite('tests/moving.txt') | |
| 224 suite = doctest.DocTestSuite() | |
| 225 unittest.TextTestRunner().run(suite) | |
| 226 |
