Mercurial > hg > nsaunier > traffic-intelligence
comparison trafficintelligence/moving.py @ 1098:469e36eea158
work in progress
| author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
|---|---|
| date | Tue, 19 Feb 2019 17:22:48 -0500 |
| parents | b3f8b26ee838 |
| children | 4ab5c63c13a3 |
comparison
equal
deleted
inserted
replaced
| 1097:b3f8b26ee838 | 1098:469e36eea158 |
|---|---|
| 1121 | 1121 |
| 1122 def getYCoordAt(self, i): | 1122 def getYCoordAt(self, i): |
| 1123 return self.positions[1][i] | 1123 return self.positions[1][i] |
| 1124 | 1124 |
| 1125 def getLaneAt(self, i): | 1125 def getLaneAt(self, i): |
| 1126 return self.positions[2][i] | 1126 return self.lanes[i] |
| 1127 | 1127 |
| 1128 def addPositionSYL(self, s, y, lane = None): | 1128 def addPositionSYL(self, s, y, lane = None): |
| 1129 self.addPositionXY(s,y) | 1129 self.addPositionXY(s,y) |
| 1130 self.lanes.append(lane) | 1130 self.lanes.append(lane) |
| 1131 | 1131 |
| 1141 def differentiate(self, doubleLastPosition = False): | 1141 def differentiate(self, doubleLastPosition = False): |
| 1142 diff = CurvilinearTrajectory() | 1142 diff = CurvilinearTrajectory() |
| 1143 p1 = self[0] | 1143 p1 = self[0] |
| 1144 for i in range(1, self.length()): | 1144 for i in range(1, self.length()): |
| 1145 p2 = self[i] | 1145 p2 = self[i] |
| 1146 diff.addPositionSYL(p2[0]-p1[0], p2[1]-p1[1]) | 1146 if p2[2] == p1[2]: |
| 1147 laneChange = None | |
| 1148 else: | |
| 1149 laneChange = (p1[2], p2[2]) | |
| 1150 diff.addPositionSYL(p2[0]-p1[0], p2[1]-p1[1], laneChange) | |
| 1147 p1=p2 | 1151 p1=p2 |
| 1148 if doubleLastPosition and self.length() > 1: | 1152 if doubleLastPosition and self.length() > 1: |
| 1149 diff.addPosition(diff[-1]) | 1153 diff.addPosition(diff[-1]) |
| 1150 return diff | 1154 return diff |
| 1151 | 1155 |
| 1189 | 1193 |
| 1190 def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): | 1194 def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): |
| 1191 super(MovingObject, self).__init__(num, timeInterval) | 1195 super(MovingObject, self).__init__(num, timeInterval) |
| 1192 if initCurvilinear: | 1196 if initCurvilinear: |
| 1193 self.curvilinearPositions = positions | 1197 self.curvilinearPositions = positions |
| 1194 self.curvilinearVelocities = velocities | 1198 self.curvilinearVelocities = velocities # third component is (previousAlignmentIdx, newAlignmentIdx) or None if no change |
| 1195 else: | 1199 else: |
| 1196 self.positions = positions | 1200 self.positions = positions |
| 1197 self.velocities = velocities | 1201 self.velocities = velocities |
| 1198 self.geometry = geometry | 1202 self.geometry = geometry |
| 1199 self.userType = userType | 1203 self.userType = userType |
| 1251 return MovingObject(num = num, timeInterval = timeInterval, positions = positions, velocities = Trajectory([[v.x]*nPoints, [v.y]*nPoints])) | 1255 return MovingObject(num = num, timeInterval = timeInterval, positions = positions, velocities = Trajectory([[v.x]*nPoints, [v.y]*nPoints])) |
| 1252 | 1256 |
| 1253 def updatePositions(self): | 1257 def updatePositions(self): |
| 1254 inter, self.positions, self.velocities = MovingObject.aggregateTrajectories(self.features, self.getTimeInterval()) | 1258 inter, self.positions, self.velocities = MovingObject.aggregateTrajectories(self.features, self.getTimeInterval()) |
| 1255 | 1259 |
| 1256 def updateCurvilinearPositions(self, method, changeOfAlignment, nextAlignment_idx, timeStep = None, instant = None, previousAlignmentId = None, maxSpeed = None, acceleration = None): | 1260 def updateCurvilinearPositions(self, method, instant, timeStep, _nextAlignmentIdx = None, maxSpeed = None, acceleration = None): |
| 1261 '''Update curvilinear position of user at new instant''' | |
| 1257 | 1262 |
| 1258 if method == 'newell': | 1263 if method == 'newell': |
| 1259 if self.curvilinearPositions is None: # vehicle without positions, all vehicles should have leader (?) | 1264 if self.curvilinearPositions is None: # vehicle without positions, all vehicles should have leader (?) |
| 1260 if self.leader.curvilinearPositions is not None and self.leader.curvilinearPositions.getSCoordAt(-1) > self.dn and len(self.leader.curvilinearPositions) >=2: | 1265 if self.leader is None: |
| 1261 leaderSpeed = self.leader.curvilinearVelocities.getSCoordAt(-1) | 1266 s = (timeStep*instant-self.initialHeadway)*self.desiredSpeed |
| 1262 instantAtX0 = self.tau + instant*timeStep - (self.leader.curvilinearPositions.getSCoordAt(-1)-self.d)/leaderSpeed | 1267 self.timeInterval = TimeInterval(instant, instant) |
| 1263 if instantAtX0 < obj.initialHeadway: #obj appears at instant initialHeadway at x=0 with desiredSpeed | 1268 self.curvilinearPositions = CurvilinearTrajectory([s], [0.], [self.initialAlignmentIdx]) |
| 1264 instantAtX0 = obj.initialHeadway | 1269 self.curvilinearVelocities = CurvilinearTrajectory() |
| 1265 | 1270 self.instantAtX0 = self.initialHeadway |
| 1266 firstInstant = int(np.ceil(instantAtX0/timeStep)) | 1271 elif self.leader.curvilinearPositions is not None and self.leader.curvilinearPositions.getSCoordAt(-1) > self.d and len(self.leader.curvilinearPositions) >=2: |
| 1272 firstInstantAfterD = self.leader.getLastInstant() | |
| 1273 while self.leader.existsAtInstant(firstInstantAfterD) and self.leader.getCurvilinearPositionAtInstant(firstInstantAfterD-1)[0] > self.d:# find first instant after d | |
| 1274 firstInstantAfterD -= 1 # if not recorded position before self.d, we extrapolate linearly from first known position | |
| 1275 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(firstInstantAfterD)[0] | |
| 1276 self.instantAtX0 = self.tau + instant*timeStep - (self.leader.getCurvilinearPositionAtInstant(firstInstantAfterD)[0]-self.d)/leaderSpeed # second part is the time at which leader is at self.d | |
| 1277 if self.instantAtX0 < self.initialHeadway: #obj appears at instant initialHeadway at x=0 with desiredSpeed | |
| 1278 self.instantAtX0 = self.initialHeadway | |
| 1279 if self.instantAtX0 is not None and instant >= self.instantAtX0: | |
| 1280 firstInstant = int(ceil(self.instantAtX0/timeStep)) | |
| 1267 self.timeInterval = TimeInterval(firstInstant, firstInstant) | 1281 self.timeInterval = TimeInterval(firstInstant, firstInstant) |
| 1268 freeFlowCoord = (firstInstant*timeStep - instantAtX0)*self.desiredSpeed | 1282 freeFlowCoord = (firstInstant*timeStep - self.instantAtX0)*self.desiredSpeed |
| 1269 # constrainedCoord at firstInstant = xn-1(t = firstInstant*timeStep-self.tn)-self.dn | 1283 # constrainedCoord at firstInstant = xn-1(t = firstInstant*timeStep-self.tau)-self.d |
| 1270 t = firstInstant*timeStep-self.tn | 1284 t = firstInstant*timeStep-self.tau |
| 1271 i = int(floor(t/timeStep)) | 1285 i = int(floor(t/timeStep)) |
| 1272 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(i)[0] | 1286 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(i)[0] |
| 1273 constrainedCoord = self.leader.getCurvilinearPositionAtInstant(i)[0]+leaderSpeed*(t-i*timeStep)-self.dn | 1287 constrainedCoord = self.leader.getCurvilinearPositionAtInstant(i)[0]+leaderSpeed*(t-i*timeStep)-self.d |
| 1274 obj.curvilinearPositions = moving.CurvilinearTrajectory([min(freeFlowCoord, constrainedCoord)], [0.], [nextAlignment_idx])# TODO verify initial alignment index | 1288 self.curvilinearPositions = CurvilinearTrajectory([min(freeFlowCoord, constrainedCoord)], [0.], [self.initialAlignmentIdx]) |
| 1275 obj.curvilinearVelocities = moving.CurvilinearTrajectory() | 1289 self.curvilinearVelocities = CurvilinearTrajectory() |
| 1290 print(firstInstant+1, instant+1) | |
| 1276 for i in range(firstInstant+1, instant+1): | 1291 for i in range(firstInstant+1, instant+1): |
| 1277 freeFlowCoord = timeStep*self.desiredSpeed + self.curvilinearPositions.getSCoordAt(-1) | 1292 s1 = self.curvilinearPositions.getSCoordAt(-1) |
| 1278 #constrainedCoord = | 1293 freeFlowCoord = s1 + timeStep*self.desiredSpeed |
| 1279 | 1294 constrainedCoord = self.leader.interpolateCurvilinearPositions(i-self.tau/timeStep)[0]-self.d |
| 1280 # if instant <= self.timeInterval[0] < instant + timeStep: | 1295 s2 = min(freeFlowCoord, constrainedCoord) |
| 1281 # # #si t < instant de creation du vehicule, la position vaut l'espacement dn entre les deux vehicules | 1296 self.curvilinearPositions.addPositionSYL(s2, 0., self.initialAlignmentIdx) |
| 1282 # if leaderVehicle is None: | 1297 self.setLastInstant(i) |
| 1283 # self.curvilinearPositions.addPositionSYL( | 1298 print(i, self.timeInterval) |
| 1284 # -self.dn, | 1299 self.curvilinearVelocities.addPositionSYL(s2-s1, 0., None) |
| 1285 # 0, | |
| 1286 # nextAlignment_idx) | |
| 1287 # else: | |
| 1288 # self.curvilinearPositions.addPositionSYL( | |
| 1289 # leaderVehicle.curvilinearPositions[0][0] - self.dn, | |
| 1290 # 0, | |
| 1291 # nextAlignment_idx) | |
| 1292 else: | 1300 else: |
| 1293 freeFlowCoord = [self.curvilinearPositions[-1][0]+self.desiredSpeed*timeStep, 0, nextAlignment_idx] | 1301 if _nextAlignmentIdx is not None: |
| 1302 laneChange = (self.curvilinearPositions.getLaneAt(-1), _nextAlignmentIdx) | |
| 1303 nextAlignmentIdx = _nextAlignmentIdx | |
| 1304 else: | |
| 1305 laneChange = None | |
| 1306 nextAlignmentIdx = self.curvilinearPositions.getLaneAt(-1) | |
| 1307 s1 = self.curvilinearPositions.getSCoordAt(-1) | |
| 1308 freeFlowCoord = s1 + self.desiredSpeed*timeStep | |
| 1294 if self.leader is None: | 1309 if self.leader is None: |
| 1295 self.curvilinearPositions.addPositionSYL(*freeFlowCoord) | 1310 if self.getLastInstant() < instant: |
| 1311 s2 = freeFlowCoord | |
| 1312 self.curvilinearPositions.addPositionSYL(freeFlowCoord, 0., nextAlignmentIdx) | |
| 1296 else: | 1313 else: |
| 1297 # if leader coord unknown at t-dn, no movement | 1314 constrainedCoord = self.leader.interpolateCurvilinearPositions(instant-self.tau/timeStep)[0]-self.d |
| 1298 if instant > self.reactionTime: | 1315 s2 = min(freeFlowCoord, constrainedCoord) |
| 1299 previousVehicleCurvilinearPositionAtPrecedentInstant = leaderVehicle.curvilinearPositions[-int(round(self.reactionTime))][0] # t-v.reactionTime | 1316 self.curvilinearPositions.addPositionSYL(s2, 0., nextAlignmentIdx) |
| 1300 else: | 1317 self.setLastInstant(instant) |
| 1301 previousVehicleCurvilinearPositionAtPrecedentInstant = \ | 1318 self.curvilinearVelocities.addPositionSYL(s2-s1, 0., laneChange) |
| 1302 leaderVehicle.curvilinearPositions[0][0] | |
| 1303 | |
| 1304 self.curvilinearPositions.addPositionSYL(previousVehicleCurvilinearPositionAtPrecedentInstant - self.dn, 0, nextAlignment_idx) | |
| 1305 | |
| 1306 #mise ajour des vitesses | |
| 1307 if changeOfAlignment: | |
| 1308 self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, | |
| 1309 (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, | |
| 1310 (previousAlignmentId, nextAlignment_idx)) | |
| 1311 else: | |
| 1312 self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, | |
| 1313 (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, | |
| 1314 None) | |
| 1315 | 1319 |
| 1316 @staticmethod | 1320 @staticmethod |
| 1317 def concatenate(obj1, obj2, num = None, newFeatureNum = None, computePositions = False): | 1321 def concatenate(obj1, obj2, num = None, newFeatureNum = None, computePositions = False): |
| 1318 '''Concatenates two objects, whether overlapping temporally or not | 1322 '''Concatenates two objects, whether overlapping temporally or not |
| 1319 | 1323 |
