Mercurial > hg > nsaunier > traffic-intelligence
comparison python/prediction.py @ 945:05d4302bf67e
working motion pattern prediction with rotation and features
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Thu, 20 Jul 2017 14:29:46 -0400 |
| parents | 84ebe1b031f1 |
| children | e5970606066f |
comparison
equal
deleted
inserted
replaced
| 944:84ebe1b031f1 | 945:05d4302bf67e |
|---|---|
| 3 | 3 |
| 4 import moving | 4 import moving |
| 5 from utils import LCSS | 5 from utils import LCSS |
| 6 | 6 |
| 7 import math, random | 7 import math, random |
| 8 from copy import copy | |
| 8 import numpy as np | 9 import numpy as np |
| 9 from multiprocessing import Pool | 10 from multiprocessing import Pool |
| 10 | 11 |
| 11 | 12 |
| 12 class PredictedTrajectory(object): | 13 class PredictedTrajectory(object): |
| 72 self.prototype = prototype | 73 self.prototype = prototype |
| 73 self.constantSpeed = constantSpeed | 74 self.constantSpeed = constantSpeed |
| 74 self.probability = probability | 75 self.probability = probability |
| 75 self.predictedPositions = {0: initialPosition} | 76 self.predictedPositions = {0: initialPosition} |
| 76 self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) | 77 self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) |
| 77 self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) | 78 self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position |
| 79 self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() | |
| 78 self.initialSpeed = initialVelocity.norm2() | 80 self.initialSpeed = initialVelocity.norm2() |
| 79 if not constantSpeed: | 81 if not constantSpeed: |
| 80 self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() | 82 self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() |
| 81 | 83 |
| 82 def predictPosition(self, nTimeSteps): | 84 def predictPosition(self, nTimeSteps): |
| 83 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | 85 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): |
| 86 deltaPosition = copy(self.deltaPosition) | |
| 84 if self.constantSpeed: | 87 if self.constantSpeed: |
| 85 traj = self.prototype.getPositions() | 88 traj = self.prototype.getPositions() |
| 86 trajLength = traj.length() | 89 trajLength = traj.length() |
| 87 traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) | 90 traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) |
| 88 i = self.closestPointIdx | 91 i = self.closestPointIdx |
| 89 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: | 92 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: |
| 90 i += 1 | 93 i += 1 |
| 91 if i == trajLength: | 94 if i == trajLength: |
| 92 v = self.prototype.getVelocityAt(-1) | 95 v = self.prototype.getVelocityAt(-1) |
| 93 self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) | 96 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) |
| 94 else: | 97 else: |
| 95 self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) | 98 v = self.prototype.getVelocityAt(i-1) |
| 99 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) | |
| 96 else: | 100 else: |
| 97 traj = self.prototype.getPositions() | 101 traj = self.prototype.getPositions() |
| 98 trajLength = traj.length() | 102 trajLength = traj.length() |
| 99 nSteps = self.ratio*nTimeSteps+self.closestPointIdx | 103 nSteps = self.ratio*nTimeSteps+self.closestPointIdx |
| 100 i = int(np.floor(nSteps)) | 104 i = int(np.floor(nSteps)) |
| 101 if nSteps < trajLength-1: | 105 if nSteps < trajLength-1: |
| 102 self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) | 106 v = self.prototype.getVelocityAt(i) |
| 107 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) | |
| 103 else: | 108 else: |
| 104 v = self.prototype.getVelocityAt(-1) | 109 v = self.prototype.getVelocityAt(-1) |
| 105 self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[-1]+v*(nSteps-trajLength+1) | 110 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) |
| 106 return self.predictedPositions[nTimeSteps] | 111 return self.predictedPositions[nTimeSteps] |
| 107 | 112 |
| 108 class PredictedTrajectoryRandomControl(PredictedTrajectory): | 113 class PredictedTrajectoryRandomControl(PredictedTrajectory): |
| 109 '''Random vehicle control: suitable for normal adaptation''' | 114 '''Random vehicle control: suitable for normal adaptation''' |
| 110 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | 115 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): |
| 550 self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance) | 555 self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance) |
| 551 self.minSimilarity = minSimilarity | 556 self.minSimilarity = minSimilarity |
| 552 self.minFeatureTime = minFeatureTime | 557 self.minFeatureTime = minFeatureTime |
| 553 self.constantSpeed = constantSpeed | 558 self.constantSpeed = constantSpeed |
| 554 self.useFeatures = useFeatures | 559 self.useFeatures = useFeatures |
| 560 | |
| 561 def addPredictedTrajectories(self, predictedTrajectories, obj, instant): | |
| 562 if not hasattr(obj, 'prototypeSimilarities'): | |
| 563 obj.prototypeSimilarities = [] | |
| 564 for proto in self.prototypes: | |
| 565 self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) | |
| 566 similarities = self.lcss.similarityTable[-1, :-1].astype(float) | |
| 567 obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) | |
| 568 for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): | |
| 569 if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: | |
| 570 initialPosition = obj.getPositionAtInstant(instant) | |
| 571 initialVelocity = obj.getVelocityAtInstant(instant) | |
| 572 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) | |
| 555 | 573 |
| 556 def generatePredictedTrajectories(self, obj, instant): | 574 def generatePredictedTrajectories(self, obj, instant): |
| 557 predictedTrajectories = [] | 575 predictedTrajectories = [] |
| 558 if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: | 576 if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: |
| 559 if self.useFeatures and obj.hasFeatures(): | 577 if self.useFeatures and obj.hasFeatures(): |
| 560 # get current features existing for the most time, sort on first instant of feature and take n first | 578 # get current features existing for the most time, sort on first instant of feature and take n first |
| 561 pass | 579 firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant)] |
| 580 firstInstants.sort(key = lambda t: t[1]) | |
| 581 for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants))]: | |
| 582 self.addPredictedTrajectories(predictedTrajectories, f, instant) | |
| 562 else: | 583 else: |
| 563 if not hasattr(obj, 'prototypeSimilarities'): | 584 self.addPredictedTrajectories(predictedTrajectories, obj, instant) |
| 564 obj.prototypeSimilarities = [] | |
| 565 for proto in self.prototypes: | |
| 566 self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) | |
| 567 similarities = self.lcss.similarityTable[-1, :-1].astype(float) | |
| 568 obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) | |
| 569 for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): | |
| 570 if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: | |
| 571 initialPosition = obj.getPositionAtInstant(instant) | |
| 572 initialVelocity = obj.getVelocityAtInstant(instant) | |
| 573 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) | |
| 574 return predictedTrajectories | 585 return predictedTrajectories |
| 575 | 586 |
| 576 if __name__ == "__main__": | 587 if __name__ == "__main__": |
| 577 import doctest | 588 import doctest |
| 578 import unittest | 589 import unittest |
