# HG changeset patch # User Nicolas Saunier # Date 1711132405 14400 # Node ID fe35473acee3264ffeaca67b172128d373a54674 # Parent 2b1c8fe8f7e4bdc066cfe717cdab3b757d88900d adding method to compute PET using polygon for the outline of a vehicle (bird eye view of the vehicle) diff -r 2b1c8fe8f7e4 -r fe35473acee3 scripts/safety-analysis.py --- a/scripts/safety-analysis.py Fri Mar 15 17:05:54 2024 -0400 +++ b/scripts/safety-analysis.py Fri Mar 22 14:33:25 2024 -0400 @@ -90,11 +90,11 @@ interactions = events.createInteractions(objects) if args.nProcesses == 1: - processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None) + processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, False, params.predictionTimeHorizon, params.crossingZones, False, None) else: pool = Pool(processes = args.nProcesses) nInteractionPerProcess = int(np.ceil(len(interactions)/float(args.nProcesses))) - jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)] + jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, False, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)] processed = [] for job in jobs: processed += job.get() diff -r 2b1c8fe8f7e4 -r fe35473acee3 trafficintelligence/events.py --- a/trafficintelligence/events.py Fri Mar 15 17:05:54 2024 -0400 +++ b/trafficintelligence/events.py Fri Mar 22 14:33:25 2024 -0400 @@ -277,8 +277,9 @@ self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False)) # TODO add probability of collision, and probability of successful evasive action - def computePET(self, collisionDistanceThreshold): - pet, t1, t2= moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold) + def computePET(self, collisionDistanceThreshold, computePetWithBoundingPoly): + 'Warning: when computing PET from interactions, there could be PETs between objects that do not coexist and therefore are not considered interactions' + pet, t1, t2= moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold, computePetWithBoundingPoly) if pet is not None: self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False)) @@ -325,14 +326,14 @@ else: return None -def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): +def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, computePetWithBoundingPoly, timeHorizon, computeCZ = False, debug = False, timeInterval = None): for inter in interactions: print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum())) inter.computeIndicators() if computeMotionPrediction: inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval) if computePET: - inter.computePET(collisionDistanceThreshold) + inter.computePET(collisionDistanceThreshold, computePetWithBoundingPoly) return interactions def aggregateSafetyPoints(interactions, pointType = 'collision'): diff -r 2b1c8fe8f7e4 -r fe35473acee3 trafficintelligence/moving.py --- a/trafficintelligence/moving.py Fri Mar 15 17:05:54 2024 -0400 +++ b/trafficintelligence/moving.py Fri Mar 22 14:33:25 2024 -0400 @@ -4,7 +4,7 @@ import copy from math import sqrt, atan2, cos, sin, inf -from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, ceil, float32, argwhere, minimum, issubdtype, integer as npinteger, percentile +from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, ceil, float32, argwhere, minimum, issubdtype, integer as npinteger, percentile, full from matplotlib.pyplot import plot, text, arrow from scipy.spatial.distance import cdist from scipy.signal import savgol_filter @@ -1938,21 +1938,35 @@ self.prototypeSimilarities.append(similarities/minimum(arange(1., len(similarities)+1), proto.getMovingObject().length()*ones(len(similarities)))) @staticmethod - def computePET(obj1, obj2, collisionDistanceThreshold): + def computePET(obj1, obj2, collisionDistanceThreshold = None, useBoundingPoly = False): '''Post-encroachment time based on distance threshold Returns the smallest time difference when the object positions are within collisionDistanceThreshold and the instants at which each object is passing through its corresponding position''' - positions1 = [p.astuple() for p in obj1.getPositions()] - positions2 = [p.astuple() for p in obj2.getPositions()] - n1 = len(positions1) - n2 = len(positions2) + n1 = int(obj1.length()) + n2 = int(obj2.length()) pets = zeros((n1, n2)) for i,t1 in enumerate(obj1.getTimeInterval()): for j,t2 in enumerate(obj2.getTimeInterval()): pets[i,j] = abs(t1-t2) - distances = cdist(positions1, positions2, metric = 'euclidean') - smallDistances = (distances <= collisionDistanceThreshold) + if useBoundingPoly: + polygons1 = [pointsToShapely(obj1.getBoundingPolygon(i)) for i in obj1.getTimeInterval()] + polygons2 = [pointsToShapely(obj2.getBoundingPolygon(i)) for i in obj2.getTimeInterval()] + overlaps = [] + for poly2 in polygons2: + prep(poly2) + for poly1 in polygons1: + prep(poly1) + overlaps.append([poly1.overlaps(poly2) for poly2 in polygons2]) + smallDistances = array(overlaps) + elif collisionDistanceThreshold is not None: + positions1 = [p.astuple() for p in obj1.getPositions()] + positions2 = [p.astuple() for p in obj2.getPositions()] + distances = cdist(positions1, positions2, metric = 'euclidean') + smallDistances = (distances <= collisionDistanceThreshold) + else: + smallDistances = array([]) + if smallDistances.any(): smallPets = pets[smallDistances] petIdx = smallPets.argmin() diff -r 2b1c8fe8f7e4 -r fe35473acee3 trafficintelligence/tests/moving.txt --- a/trafficintelligence/tests/moving.txt Fri Mar 15 17:05:54 2024 -0400 +++ b/trafficintelligence/tests/moving.txt Fri Mar 22 14:33:25 2024 -0400 @@ -261,6 +261,11 @@ >>> MovingObject.computePET(o1, o2, 0.1) (15.0, 5, 20) +>>> o1 = MovingObject(1, TimeInterval(0,10), features=[MovingObject.generate(1, Point(0., 3.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(2, Point(2., 3.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(3, Point(2., 4.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(4, Point(0., 4.), Point(1., 0.), TimeInterval(0,10))]) +>>> o2 = MovingObject(2, TimeInterval(0,10), features=[MovingObject.generate(5, Point(6., 0.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(6, Point(7., 0.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(7, Point(7., 2.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(8, Point(6., 2.), Point(0., 1.), TimeInterval(0,10))]) +>>> MovingObject.computePET(o1, o2, useBoundingPoly = True) +(2.0, 5, 3) + >>> t1 = CurvilinearTrajectory.generate(3, 1., 10, 'b') >>> t1.length() 10