Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/indicators.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Fri, 15 Jun 2018 11:19:10 -0400 |
| parents | python/indicators.py@933670761a57 |
| children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
| 1027:6129296848d3 | 1028:cc5cb04b04b0 |
|---|---|
| 1 #! /usr/bin/env python | |
| 2 '''Class for indicators, temporal indicators, and safety indicators''' | |
| 3 | |
| 4 from trafficintelligence import moving | |
| 5 #import matplotlib.nxutils as nx | |
| 6 from matplotlib.pyplot import plot, ylim | |
| 7 from matplotlib.pylab import find | |
| 8 from numpy import array, arange, mean, floor, mean | |
| 9 from scipy import percentile | |
| 10 | |
| 11 def multivariateName(indicatorNames): | |
| 12 return '_'.join(indicatorNames) | |
| 13 | |
| 14 # need for a class representing the indicators, their units, how to print them in graphs... | |
| 15 class TemporalIndicator(object): | |
| 16 '''Class for temporal indicators | |
| 17 i.e. indicators that take a value at specific instants | |
| 18 | |
| 19 values should be | |
| 20 * a dict, for the values at specific time instants | |
| 21 * or a list with a time interval object if continuous measurements | |
| 22 | |
| 23 it should have more information like name, unit''' | |
| 24 | |
| 25 def __init__(self, name, values, timeInterval = None, maxValue = None): | |
| 26 self.name = name | |
| 27 if timeInterval is None: | |
| 28 self.values = values | |
| 29 instants = sorted(self.values.keys()) | |
| 30 if len(instants) > 0: | |
| 31 self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) | |
| 32 else: | |
| 33 self.timeInterval = moving.TimeInterval() | |
| 34 else: | |
| 35 assert len(values) == timeInterval.length() | |
| 36 self.timeInterval = timeInterval | |
| 37 self.values = {} | |
| 38 for i in range(int(round(self.timeInterval.length()))): | |
| 39 self.values[self.timeInterval[i]] = values[i] | |
| 40 self.maxValue = maxValue | |
| 41 | |
| 42 def __len__(self): | |
| 43 return len(self.values) | |
| 44 | |
| 45 def empty(self): | |
| 46 return len(self.values) == 0 | |
| 47 | |
| 48 def __getitem__(self, t): | |
| 49 'Returns the value at time t' | |
| 50 return self.values.get(t) | |
| 51 | |
| 52 def getIthValue(self, i): | |
| 53 sortedKeys = sorted(self.values.keys()) | |
| 54 if 0<=i<len(sortedKeys): | |
| 55 return self.values[sortedKeys[i]] | |
| 56 else: | |
| 57 return None | |
| 58 | |
| 59 def __iter__(self): | |
| 60 self.iterInstantNum = 0 # index in the interval or keys of the dict | |
| 61 return self | |
| 62 | |
| 63 def __next__(self): | |
| 64 if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ | |
| 65 # or (self.iterInstantNum >= self.values) | |
| 66 raise StopIteration | |
| 67 else: | |
| 68 self.iterInstantNum += 1 | |
| 69 return self.getIthValue(self.iterInstantNum-1) | |
| 70 | |
| 71 def getTimeInterval(self): | |
| 72 return self.timeInterval | |
| 73 | |
| 74 def getName(self): | |
| 75 return self.name | |
| 76 | |
| 77 def getValues(self): | |
| 78 return [self.__getitem__(t) for t in self.timeInterval] | |
| 79 | |
| 80 def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs): | |
| 81 if self.getTimeInterval().length() == 1: | |
| 82 marker = 'o' | |
| 83 else: | |
| 84 marker = '' | |
| 85 time = sorted(self.values.keys()) | |
| 86 plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs) | |
| 87 if self.maxValue: | |
| 88 ylim(ymax = self.maxValue) | |
| 89 | |
| 90 @classmethod | |
| 91 def createMultivariate(cls, indicators): | |
| 92 '''Creates a new temporal indicator where the value at each instant is a list | |
| 93 of the indicator values at the instant, in the same order | |
| 94 the time interval will be the union of the time intervals of the indicators | |
| 95 name is concatenation of the indicator names''' | |
| 96 if len(indicators) < 2: | |
| 97 print('Error creating multivariate indicator with only {} indicator'.format(len(indicators))) | |
| 98 return None | |
| 99 | |
| 100 timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators]) | |
| 101 values = {} | |
| 102 for t in timeInterval: | |
| 103 tmpValues = [indic[t] for indic in indicators] | |
| 104 uniqueValues = set(tmpValues) | |
| 105 if len(uniqueValues) >= 2 or uniqueValues.pop() is not None: | |
| 106 values[t] = tmpValues | |
| 107 return cls(multivariateName([indic.name for indic in indicators]), values) | |
| 108 | |
| 109 # TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant | |
| 110 | |
| 111 def l1Distance(x, y): # lambda x,y:abs(x-y) | |
| 112 if x is None or y is None: | |
| 113 return float('inf') | |
| 114 else: | |
| 115 return abs(x-y) | |
| 116 | |
| 117 def multiL1Matching(x, y, thresholds, proportionMatching=1.): | |
| 118 n = 0 | |
| 119 nDimensions = len(x) | |
| 120 for i in range(nDimensions): | |
| 121 if l1Distance(x[i], y[i]) <= thresholds[i]: | |
| 122 n += 1 | |
| 123 return n >= nDimensions*proportionMatching | |
| 124 | |
| 125 from utils import LCSS as utilsLCSS | |
| 126 | |
| 127 class LCSS(utilsLCSS): | |
| 128 '''Adapted LCSS class for indicators, same pattern''' | |
| 129 def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min): | |
| 130 utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc) | |
| 131 self.minLength = minLength | |
| 132 | |
| 133 def checkIndicator(self, indicator): | |
| 134 return indicator is not None and len(indicator) >= self.minLength | |
| 135 | |
| 136 def compute(self, indicator1, indicator2, computeSubSequence = False): | |
| 137 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
| 138 return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
| 139 else: | |
| 140 return 0 | |
| 141 | |
| 142 def computeNormalized(self, indicator1, indicator2, computeSubSequence = False): | |
| 143 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
| 144 return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
| 145 else: | |
| 146 return 0. | |
| 147 | |
| 148 def computeDistance(self, indicator1, indicator2, computeSubSequence = False): | |
| 149 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
| 150 return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
| 151 else: | |
| 152 return 1. | |
| 153 | |
| 154 class SeverityIndicator(TemporalIndicator): | |
| 155 '''Class for severity indicators | |
| 156 field mostSevereIsMax is True | |
| 157 if the most severe value taken by the indicator is the maximum''' | |
| 158 | |
| 159 def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): | |
| 160 TemporalIndicator.__init__(self, name, values, timeInterval, maxValue) | |
| 161 self.mostSevereIsMax = mostSevereIsMax | |
| 162 | |
| 163 def getMostSevereValue(self, minNInstants=1, centile=15.): | |
| 164 '''if there are more than minNInstants observations, | |
| 165 returns either the average of these maximum values | |
| 166 or if centile is not None the n% centile from the most severe value | |
| 167 | |
| 168 eg for TTC, 15 returns the 15th centile (value such that 15% of observations are lower)''' | |
| 169 if self.__len__() < minNInstants: | |
| 170 return None | |
| 171 else: | |
| 172 values = list(self.values.values()) | |
| 173 if centile is not None: | |
| 174 if self.mostSevereIsMax: | |
| 175 c = 100-centile | |
| 176 else: | |
| 177 c = centile | |
| 178 return percentile(values, c) | |
| 179 else: | |
| 180 values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values | |
| 181 return mean(values[:minNInstants]) | |
| 182 | |
| 183 def getInstantOfMostSevereValue(self): | |
| 184 '''Returns the instant at which the indicator reaches its most severe value''' | |
| 185 if self.mostSevereIsMax: | |
| 186 return max(self.values, key=self.values.get) | |
| 187 else: | |
| 188 return min(self.values, key=self.values.get) | |
| 189 | |
| 190 # functions to aggregate discretized maps of indicators | |
| 191 # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) | |
| 192 | |
| 193 def indicatorMap(indicatorValues, trajectory, squareSize): | |
| 194 '''Returns a dictionary | |
| 195 with keys for the indices of the cells (squares) | |
| 196 in which the trajectory positions are located | |
| 197 at which the indicator values are attached | |
| 198 | |
| 199 ex: speeds and trajectory''' | |
| 200 | |
| 201 assert len(indicatorValues) == trajectory.length() | |
| 202 indicatorMap = {} | |
| 203 for k in range(trajectory.length()): | |
| 204 p = trajectory[k] | |
| 205 i = floor(p.x/squareSize) | |
| 206 j = floor(p.y/squareSize) | |
| 207 if (i,j) in indicatorMap: | |
| 208 indicatorMap[(i,j)].append(indicatorValues[k]) | |
| 209 else: | |
| 210 indicatorMap[(i,j)] = [indicatorValues[k]] | |
| 211 for k in indicatorMap: | |
| 212 indicatorMap[k] = mean(indicatorMap[k]) | |
| 213 return indicatorMap | |
| 214 | |
| 215 # def indicatorMapFromPolygon(value, polygon, squareSize): | |
| 216 # '''Fills an indicator map with the value within the polygon | |
| 217 # (array of Nx2 coordinates of the polygon vertices)''' | |
| 218 # points = [] | |
| 219 # for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): | |
| 220 # for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): | |
| 221 # points.append([x,y]) | |
| 222 # inside = nx.points_inside_poly(array(points), polygon) | |
| 223 # indicatorMap = {} | |
| 224 # for i in range(len(inside)): | |
| 225 # if inside[i]: | |
| 226 # indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 | |
| 227 # return indicatorMap | |
| 228 | |
| 229 def indicatorMapFromAxis(value, limits, squareSize): | |
| 230 '''axis = [xmin, xmax, ymin, ymax] ''' | |
| 231 indicatorMap = {} | |
| 232 for x in arange(limits[0], limits[1], squareSize): | |
| 233 for y in arange(limits[2], limits[3], squareSize): | |
| 234 indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value | |
| 235 return indicatorMap | |
| 236 | |
| 237 def combineIndicatorMaps(maps, squareSize, combinationFunction): | |
| 238 '''Puts many indicator maps together | |
| 239 (averaging the values in each cell | |
| 240 if more than one maps has a value)''' | |
| 241 indicatorMap = {} | |
| 242 for m in maps: | |
| 243 for k,v in m.items(): | |
| 244 if k in indicatorMap: | |
| 245 indicatorMap[k].append(v) | |
| 246 else: | |
| 247 indicatorMap[k] = [v] | |
| 248 for k in indicatorMap: | |
| 249 indicatorMap[k] = combinationFunction(indicatorMap[k]) | |
| 250 return indicatorMap | |
| 251 | |
| 252 if __name__ == "__main__": | |
| 253 import doctest | |
| 254 import unittest | |
| 255 suite = doctest.DocFileSuite('tests/indicators.txt') | |
| 256 unittest.TextTestRunner().run(suite) | |
| 257 # #doctest.testmod() | |
| 258 # #doctest.testfile("example.txt") |
