# HG changeset patch # User MohamedGomaa # Date 1405445115 14400 # Node ID 8ba4b8ad4c8642f6749e432547450061cb658e0c # Parent e6ab4caf359c78a100d653733a42cf518be38e9e Motion Pattern Method diff -r e6ab4caf359c -r 8ba4b8ad4c86 python/events.py --- a/python/events.py Tue Jul 08 15:22:30 2014 -0400 +++ b/python/events.py Tue Jul 15 13:25:15 2014 -0400 @@ -1,7 +1,7 @@ #! /usr/bin/env python '''Libraries for events Interactions, pedestrian crossing...''' - +import sys import numpy as np from numpy import arccos @@ -141,6 +141,46 @@ pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def computeCrossingsCollisionsPrototype(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,nMatching,objects,minSimilarity=0.5,index=None,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30): + '''Computes all crossing and collision points at each common instant for two road users. ''' + self.collisionPoints={} + self.crossingZones={} + TTCs = {} + route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID) + route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID) + + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = self.timeInterval + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + sys.path.append("D:/behaviourAnalysis/libs") + import trajLearning + if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: + inter1=moving.Interval(self.roadUser1.timeInterval.first,i) + inter2=moving.Interval(self.roadUser2.timeInterval.first,i) + partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions + partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions + prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,minSimilarity,index,mostMatched) + prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,minSimilarity,index,mostMatched) + if prototypeTrajectories1=={}: + print self.roadUser1.num, 'is abnormal at instant', str(i) + elif prototypeTrajectories2=={}: + print self.roadUser2.num, 'is abnormal at instant', str(i) + else: + self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) + if len(self.collisionPoints[i]) > 0: + TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) + # add probability of collision, and probability of successful evasive action + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) + + if computeCZ: + pPETs = {} + for i in list(commonTimeInterval)[:-1]: + if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0: + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename diff -r e6ab4caf359c -r 8ba4b8ad4c86 python/prediction.py --- a/python/prediction.py Tue Jul 08 15:22:30 2014 -0400 +++ b/python/prediction.py Tue Jul 15 13:25:15 2014 -0400 @@ -4,6 +4,7 @@ import moving import math import random +import numpy as np class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation @@ -47,6 +48,20 @@ def getControl(self): return self.control +def findNearestOrientation(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle +#todo: merge with previous function +def findNearestInstant(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return t + class PredictedTrajectoryPrototype(PredictedTrajectory): '''Predicted trajectory that follows a prototype trajectory The prototype is in the format of a moving.Trajectory: it could be @@ -59,20 +74,39 @@ (applying a constant ratio equal to the ratio of the user instantaneous speed and the trajectory closest speed)''' - def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): + def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.): self.prototypeTrajectory = prototypeTrajectory self.constantSpeed = constantSpeed + self.speedList = speedList self.probability = probability self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): - if constantSpeed: + if self.constantSpeed: # calculate cumulative distance - pass + speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + #pass + elif self.speedList!=[]: + pass else: # see c++ code, calculate ratio - pass + speedNorm= self.predictedSpeedOrientations[0].norm + instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory) + prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] + ratio=float(speedNorm)/prototypeSpeeds[0] + resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + if nTimeSteps