Mercurial > hg > nsaunier > traffic-intelligence
comparison python/event.py @ 293:ee3302528cdc
rearranged new code by Paul (works now)
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 08 Feb 2013 18:13:29 -0500 |
| parents | 8b2c8a4015f1 |
| children | 1f253f218b9f |
comparison
equal
deleted
inserted
replaced
| 292:8b2c8a4015f1 | 293:ee3302528cdc |
|---|---|
| 1 #! /usr/bin/env python | 1 #! /usr/bin/env python |
| 2 '''Libraries for events | 2 '''Libraries for events |
| 3 Interactions, pedestrian crossing...''' | 3 Interactions, pedestrian crossing...''' |
| 4 | 4 |
| 5 ## Native | 5 import numpy as np |
| 6 | |
| 6 import multiprocessing | 7 import multiprocessing |
| 7 import itertools | 8 import itertools |
| 8 #import utils; | 9 |
| 9 import numpy as np | |
| 10 import moving | 10 import moving |
| 11 import prediction | 11 import prediction |
| 12 | 12 |
| 13 __metaclass__ = type | 13 __metaclass__ = type |
| 14 | |
| 15 class Interaction(moving.STObject): | |
| 16 '''Class for an interaction between two road users | |
| 17 or a road user and an obstacle | |
| 18 | |
| 19 link to the moving objects | |
| 20 ''' | |
| 21 | |
| 22 categories = {'headon': 0, | |
| 23 'rearend': 1, | |
| 24 'side': 2, | |
| 25 'parallel': 3} | |
| 26 | |
| 27 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): | |
| 28 moving.STObject.__init__(self, num, timeInterval) | |
| 29 self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) | |
| 30 self.movingObject1 = movingObject1 | |
| 31 self.movingObject2 = movingObject2 | |
| 32 self.categoryNum = categoryNum | |
| 33 | |
| 34 def getIndicator(self, indicatorName): | |
| 35 if hasattr(self, 'indicators'): | |
| 36 for i in self.indicators: | |
| 37 if i.name == indicatorName: | |
| 38 return i | |
| 39 else: | |
| 40 return None | |
| 41 | |
| 42 def computeIndicators(self): | |
| 43 '''Computes the collision course cosine only if the cosine is positive''' | |
| 44 collisionCourseDotProduct = [0]*int(self.timeInterval.length()) | |
| 45 collisionCourseCosine = {} | |
| 46 distances = [0]*int(self.timeInterval.length()) | |
| 47 for i,instant in enumerate(self.timeInterval): | |
| 48 deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) | |
| 49 deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) | |
| 50 collisionCourseDotProduct[i] = moving.Point.dot(deltap, deltav) | |
| 51 distances[i] = deltap.norm2() | |
| 52 if collisionCourseDotProduct[i] > 0: | |
| 53 collisionCourseCosine[instant] = collisionCourseDotProduct[i]/(distances[i]*deltav.norm2()) | |
| 54 self.indicators = [moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProduct, self.timeInterval), | |
| 55 moving.SeverityIndicator('Distances', distances, self.timeInterval), | |
| 56 moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosine)] | |
| 57 | |
| 58 | |
| 59 def createInteractions(objects): | |
| 60 '''Create all interactions of two co-existing road users | |
| 61 | |
| 62 todo add test to compute categories?''' | |
| 63 interactions = [] | |
| 64 num = 0 | |
| 65 for i in xrange(len(objects)): | |
| 66 for j in xrange(i): | |
| 67 commonTimeInterval = objects[i].commonTimeInterval(objects[j]) | |
| 68 if not commonTimeInterval.empty(): | |
| 69 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) | |
| 70 num += 1 | |
| 71 return interactions | |
| 72 | |
| 14 | 73 |
| 15 # TODO: | 74 # TODO: |
| 16 #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class | 75 #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class |
| 17 #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 | 76 #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 |
| 18 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): | 77 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): |
| 44 def calculateIndicatorPipe_star(a_b): | 103 def calculateIndicatorPipe_star(a_b): |
| 45 """Convert `f([1,2])` to `f(1,2)` call.""" | 104 """Convert `f([1,2])` to `f(1,2)` call.""" |
| 46 return calculateIndicatorPipe(*a_b) | 105 return calculateIndicatorPipe(*a_b) |
| 47 | 106 |
| 48 class VehPairs(): | 107 class VehPairs(): |
| 49 # Create a veh-pairs object from objects list | 108 '''Create a veh-pairs object from objects list''' |
| 50 def __init__(self,objects): | 109 def __init__(self,objects): |
| 51 ''' | 110 self.pairs = createInteractions(objects) |
| 52 Create all pairs of two co-existing road users | |
| 53 TODO: add test to compute categories? | |
| 54 ''' | |
| 55 self.pairs = [] | |
| 56 self.interactionCount = 0 | 111 self.interactionCount = 0 |
| 57 self.CPcount = 0 | 112 self.CPcount = 0 |
| 58 self.CZcount = 0 | 113 self.CZcount = 0 |
| 59 num = 0 | |
| 60 for i in xrange(len(objects)): | |
| 61 for j in xrange(i): | |
| 62 commonTimeInterval = objects[i].commonTimeInterval(objects[j]) | |
| 63 if not commonTimeInterval.empty(): | |
| 64 self.pairs.append(event.Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) | |
| 65 num += 1 | |
| 66 | 114 |
| 67 # Process indicator calculation with support for multi-threading | 115 # Process indicator calculation with support for multi-threading |
| 68 def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): | 116 def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): |
| 69 if(threads > 1): | 117 if(threads > 1): |
| 70 pool = multiprocessing.Pool(threads) | 118 pool = multiprocessing.Pool(threads) |
| 100 | 148 |
| 101 for j in self.pairs: | 149 for j in self.pairs: |
| 102 self.interactionCount = self.interactionCount + len(j.CP) | 150 self.interactionCount = self.interactionCount + len(j.CP) |
| 103 self.CPcount = len(self.getCPlist()) | 151 self.CPcount = len(self.getCPlist()) |
| 104 self.Czcount = len(self.getCZlist()) | 152 self.Czcount = len(self.getCZlist()) |
| 105 return | |
| 106 | 153 |
| 107 | 154 |
| 108 def getPairsWCP(self): | 155 def getPairsWCP(self): |
| 109 lists = [] | 156 lists = [] |
| 110 for j in self.pairs: | 157 for j in self.pairs: |
| 147 TTC_list.append(i[1].indicator) | 194 TTC_list.append(i[1].indicator) |
| 148 histo = np.histogram(TTC_list,bins=bins) | 195 histo = np.histogram(TTC_list,bins=bins) |
| 149 histo += (histo[0].astype(float)/np.sum(histo[0]),) | 196 histo += (histo[0].astype(float)/np.sum(histo[0]),) |
| 150 return histo | 197 return histo |
| 151 | 198 |
| 152 class Interaction(moving.STObject): | |
| 153 '''Class for an interaction between two road users | |
| 154 or a road user and an obstacle | |
| 155 | |
| 156 link to the moving objects | |
| 157 ''' | |
| 158 | |
| 159 categories = {'headon': 0, | |
| 160 'rearend': 1, | |
| 161 'side': 2, | |
| 162 'parallel': 3} | |
| 163 | |
| 164 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): | |
| 165 moving.STObject.__init__(self, num, timeInterval) | |
| 166 self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) | |
| 167 self.movingObject1 = movingObject1 | |
| 168 self.movingObject2 = movingObject2 | |
| 169 self.categoryNum = categoryNum | |
| 170 | |
| 171 def getIndicator(self, indicatorName): | |
| 172 if hasattr(self, 'indicators'): | |
| 173 for i in self.indicators: | |
| 174 if i.name == indicatorName: | |
| 175 return i | |
| 176 else: | |
| 177 return None | |
| 178 | |
| 179 def computeIndicators(self): | |
| 180 '''Computes the collision course cosine only if the cosine is positive''' | |
| 181 collisionCourseDotProduct = [0]*int(self.timeInterval.length()) | |
| 182 collisionCourseCosine = {} | |
| 183 distances = [0]*int(self.timeInterval.length()) | |
| 184 for i,instant in enumerate(self.timeInterval): | |
| 185 deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) | |
| 186 deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) | |
| 187 collisionCourseDotProduct[i] = moving.Point.dot(deltap, deltav) | |
| 188 distances[i] = deltap.norm2() | |
| 189 if collisionCourseDotProduct[i] > 0: | |
| 190 collisionCourseCosine[instant] = collisionCourseDotProduct[i]/(distances[i]*deltav.norm2()) | |
| 191 self.indicators = [moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProduct, self.timeInterval), | |
| 192 moving.SeverityIndicator('Distances', distances, self.timeInterval), | |
| 193 moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosine)] | |
| 194 | |
| 195 | |
| 196 ######====>BEGIN LEGACY CODE | |
| 197 def createInteractions(objects): | |
| 198 '''Create all interactions of two co-existing road users | |
| 199 | |
| 200 todo add test to compute categories?''' | |
| 201 interactions = [] | |
| 202 num = 0 | |
| 203 for i in xrange(len(objects)): | |
| 204 for j in xrange(i): | |
| 205 commonTimeInterval = objects[i].commonTimeInterval(objects[j]) | |
| 206 if not commonTimeInterval.empty(): | |
| 207 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) | |
| 208 num += 1 | |
| 209 return interactions | |
| 210 #<====END LEGACY CODE##### | |
| 211 | |
| 212 | |
| 213 | |
| 214 class Crossing(moving.STObject): | 199 class Crossing(moving.STObject): |
| 215 '''Class for the event of a street crossing | 200 '''Class for the event of a street crossing |
| 216 | 201 |
| 217 TODO: detecter passage sur la chaussee | 202 TODO: detecter passage sur la chaussee |
| 218 identifier origines et destination (ou uniquement chaussee dans FOV) | 203 identifier origines et destination (ou uniquement chaussee dans FOV) |
