Mercurial > hg > nsaunier > traffic-intelligence
diff scripts/process.py @ 1077:3939ae415be0
Merging
| author | Wendlasida |
|---|---|
| date | Fri, 20 Jul 2018 14:03:34 -0400 |
| parents | 0154133e77df |
| children | 58994b08be42 |
line wrap: on
line diff
--- a/scripts/process.py Fri Jul 20 13:50:43 2018 -0400 +++ b/scripts/process.py Fri Jul 20 14:03:34 2018 -0400 @@ -8,7 +8,7 @@ #atplotlib.use('Agg') import matplotlib.pyplot as plt import numpy as np -from pandas import DataFrame +import pandas as pd from trafficintelligence import storage, events, prediction, cvutils, utils, moving, processing, ml from trafficintelligence.metadata import * @@ -16,14 +16,15 @@ parser = argparse.ArgumentParser(description='This program manages the processing of several files based on a description of the sites and video data in an SQLite database following the metadata module.') # input parser.add_argument('--db', dest = 'metadataFilename', help = 'name of the metadata file', required = True) -parser.add_argument('--videos', dest = 'videoIds', help = 'indices of the video sequences', nargs = '*', type = int) +parser.add_argument('--videos', dest = 'videoIds', help = 'indices of the video sequences', nargs = '*') parser.add_argument('--sites', dest = 'siteIds', help = 'indices of the video sequences', nargs = '*') # main function parser.add_argument('--delete', dest = 'delete', help = 'data to delete', choices = ['feature', 'object', 'classification', 'interaction']) parser.add_argument('--process', dest = 'process', help = 'data to process', choices = ['feature', 'object', 'classification', 'prototype', 'interaction']) parser.add_argument('--display', dest = 'display', help = 'data to display (replay over video)', choices = ['feature', 'object', 'classification', 'interaction']) -parser.add_argument('--analyze', dest = 'analyze', help = 'data to analyze (results)', choices = ['feature', 'object', 'classification', 'interaction']) +parser.add_argument('--progress', dest = 'progress', help = 'information about the progress of processing', action = 'store_true') +parser.add_argument('--analyze', dest = 'analyze', help = 'data to analyze (results)', choices = ['feature', 'object', 'classification', 'interaction', 'event']) # common options parser.add_argument('--cfg', dest = 'configFilename', help = 'name of the configuration file') @@ -60,11 +61,13 @@ # analysis options parser.add_argument('--output', dest = 'output', help = 'kind of output to produce (interval means)', choices = ['figure', 'interval', 'event']) parser.add_argument('--min-user-duration', dest = 'minUserDuration', help = 'mininum duration we have to see the user to take into account in the analysis (s)', type = float, default = 0.1) -parser.add_argument('--interval-duration', dest = 'intervalDuration', help = 'length of time interval to aggregate data (min)', type = float, default = 15.) -parser.add_argument('--aggregation', dest = 'aggMethod', help = 'aggregation method per user/event and per interval', choices = ['mean', 'median', 'centile'], nargs = '*', default = ['median']) -parser.add_argument('--aggregation-centile', dest = 'aggCentiles', help = 'centile(s) to compute from the observations', nargs = '*', type = int) +parser.add_argument('--interval-duration', dest = 'intervalDuration', help = 'length of time interval to aggregate data (min)', type = int, default = 15) +parser.add_argument('--aggregation', dest = 'aggMethods', help = 'aggregation method per user/interaction and per interval', choices = ['mean', 'median', 'centile'], nargs = '*', default = ['median']) +parser.add_argument('--aggregation-centiles', dest = 'aggCentiles', help = 'centile(s) to compute from the observations', nargs = '*', type = int) +parser.add_argument('--event-thresholds', dest = 'eventThresholds', help = 'threshold to count severe situations', nargs = '*', type = float) +parser.add_argument('--event-filename', dest = 'eventFilename', help = 'filename of the event data') dpi = 150 -# unit of analysis: site or video sequence? +# unit of analysis: site - camera-view # need way of selecting sites as similar as possible to sql alchemy syntax # override tracking.cfg from db @@ -82,16 +85,22 @@ videoSequences = [] sites = [] if args.videoIds is not None: - videoSequences = [session.query(VideoSequence).get(videoId) for videoId in args.videoIds] - siteIds = set([vs.cameraView.siteIdx for vs in videoSequences]) + for videoId in args.videoIds: + if '-' in videoId: + videoSequences.extend([session.query(VideoSequence).get(i) for i in moving.TimeInterval.parse(videoId)]) + else: + videoSequences.append(session.query(VideoSequence).get(int(videoId))) + videoSequences = [vs for vs in videoSequences if vs is not None] + sites = set([vs.cameraView.site for vs in videoSequences]) elif args.siteIds is not None: - siteIds = set(args.siteIds) - for siteId in siteIds: - tmpsites = getSite(session, siteId) - sites.extend(tmpsites) - for site in tmpsites: - for cv in site.cameraViews: - videoSequences.extend(cv.videoSequences) + for siteId in args.siteIds: + if '-' in siteId: + sites.extend([session.query(Site).get(i) for i in moving.TimeInterval.parse(siteId)]) + else: + sites.append(session.query(Site).get(int(siteId))) + sites = [s for s in sites if s is not None] + for site in sites: + videoSequences.extend(getSiteVideoSequences(site)) else: print('No video/site to process') @@ -99,6 +108,34 @@ pool = Pool(args.nProcesses) ################################# +# Report progress in the processing +################################# +if args.progress: + print('Providing information on data progress') + headers = ['site', 'vs', 'features', 'objects', 'interactions'] # todo add prototypes and object classification + data = [] + for site in sites: + unprocessedVideoSequences = [] + for vs in getSiteVideoSequences(site): + if (parentPath/vs.getDatabaseFilename()).is_file(): # TODO check time of file? + tableNames = storage.tableNames(str(parentPath.absolute()/vs.getDatabaseFilename())) + data.append([site.name, vs.idx, 'positions' in tableNames, 'objects' in tableNames, 'interactions' in tableNames]) + else: + unprocessedVideoSequences.append(vs) + data.append([site.name, vs.idx, False, False, False]) + #if len(unprocessedVideoSequences): + # print('Site {} ({}) has {} completely unprocessed video sequences'.format (site.name, site.idx, len(unprocessedVideoSequences))) + data = pd.DataFrame(data, columns = headers) + print('-'*80) + print('\t'+' '.join(headers[2:])) + print('-'*80) + for name, group in data.groupby(['site']): #.agg({'vs': 'count'})) + n = group.vs.count() + print('{}: {} % / {} % / {} % ({})'.format(name, 100*group.features.sum()/float(n), 100*group.objects.sum()/float(n), 100*group.interactions.sum()/float(n), n)) + print('-'*80) + print(data) + +################################# # Delete ################################# if args.delete is not None: @@ -119,20 +156,20 @@ if args.process in ['feature', 'object']: # tracking if args.nProcesses == 1: for vs in videoSequences: - if not (parentPath/vs.getDatabaseFilename()).exists() or args.process == 'object': + if not (parentPath/vs.getDatabaseFilename()).is_file() or args.process == 'object': if args.configFilename is None: configFilename = str(parentPath/vs.cameraView.getTrackingConfigurationFilename()) else: configFilename = args.configFilename if vs.cameraView.cameraType is None: cvutils.tracking(configFilename, args.process == 'object', str(parentPath.absolute()/vs.getVideoSequenceFilename()), str(parentPath.absolute()/vs.getDatabaseFilename()), str(parentPath.absolute()/vs.cameraView.getHomographyFilename()), str(parentPath.absolute()/vs.cameraView.getMaskFilename()), False, None, None, args.dryRun) - else: + else: #caution: cameratype can be not none, but without parameters for undistortion cvutils.tracking(configFilename, args.process == 'object', str(parentPath.absolute()/vs.getVideoSequenceFilename()), str(parentPath.absolute()/vs.getDatabaseFilename()), str(parentPath.absolute()/vs.cameraView.getHomographyFilename()), str(parentPath.absolute()/vs.cameraView.getMaskFilename()), True, vs.cameraView.cameraType.intrinsicCameraMatrix, vs.cameraView.cameraType.distortionCoefficients, args.dryRun) else: print('SQLite already exists: {}'.format(parentPath/vs.getDatabaseFilename())) else: for vs in videoSequences: - if not (parentPath/vs.getDatabaseFilename()).exists() or args.process == 'object': + if not (parentPath/vs.getDatabaseFilename()).is_file() or args.process == 'object': if args.configFilename is None: configFilename = str(parentPath/vs.cameraView.getTrackingConfigurationFilename()) else: @@ -147,7 +184,7 @@ pool.join() elif args.process == 'prototype': # motion pattern learning - # learn by site by default -> group videos by site (or by camera view? TODO add cameraviews) + # learn by site by default -> group videos by camera view TODO # by default, load all objects, learn and then assign (BUT not save the assignments) for site in sites: print('Learning motion patterns for site {} ({})'.format(site.idx, site.name)) @@ -177,7 +214,6 @@ outputPrototypeDatabaseFilename = args.databaseFilename else: outputPrototypeDatabaseFilename = args.outputPrototypeDatabaseFilename - # TODO maintain mapping from object prototype to db filename + compute nmatchings before clusterSizes = ml.computeClusterSizes(labels, prototypeIndices, -1) storage.savePrototypesToSqlite(str(parentPath/site.getPath()/outputPrototypeDatabaseFilename), [moving.Prototype(object2VideoSequences[trainingObjects[i]].getDatabaseFilename(False), trainingObjects[i].getNum(), prototypeType, clusterSizes[i]) for i in prototypeIndices]) @@ -213,46 +249,34 @@ if args.analyze == 'object': # user speeds, accelerations # aggregation per site + if args.eventFilename is None: + print('Missing output filename (event-filename). Exiting') + sys.exit(0) data = [] # list of observation per site-user with time - headers = ['sites', 'date', 'time', 'user_type'] - aggFunctions = {} - for method in args.aggMethod: - if method == 'centile': - aggFunctions[method] = utils.aggregationFunction(method, args.aggCentiles) - for c in args.aggCentiles: - headers.append('{}{}'.format(method,c)) - else: - aggFunctions[method] = utils.aggregationFunction(method) - headers.append(method) - for vs in videoSequences: - d = vs.startTime.date() - t1 = vs.startTime.time() - minUserDuration = args.minUserDuration*vs.cameraView.cameraType.frameRate - print('Extracting speed from '+vs.getDatabaseFilename()) - objects = storage.loadTrajectoriesFromSqlite(str(parentPath/vs.getDatabaseFilename()), 'object', args.nObjects) - for o in objects: - if o.length() > minUserDuration: - row = [vs.cameraView.siteIdx, d, utils.framesToTime(o.getFirstInstant(), vs.cameraView.cameraType.frameRate, t1), o.getUserType()] - tmp = o.getSpeeds() - for method,func in aggFunctions.items(): - aggSpeeds = vs.cameraView.cameraType.frameRate*3.6*func(tmp) - if method == 'centile': - row += aggSpeeds.tolist() - else: - row.append(aggSpeeds) - data.append(row) - data = DataFrame(data, columns = headers) + headers = ['site', 'date', 'time', 'user_type'] + aggFunctions, tmpheaders = utils.aggregationMethods(args.aggMethods, args.aggCentiles) + headers.extend(tmpheaders) + if args.nProcesses == 1: + for vs in videoSequences: + data.extend(processing.extractVideoSequenceSpeeds(str(parentPath/vs.getDatabaseFilename()), vs.cameraView.site.name, args.nObjects, vs.startTime, vs.cameraView.cameraType.frameRate, args.minUserDuration, args.aggMethods, args.aggCentiles)) + else: + jobs = [pool.apply_async(processing.extractVideoSequenceSpeeds, args = (str(parentPath/vs.getDatabaseFilename()), vs.cameraView.site.name, args.nObjects, vs.startTime, vs.cameraView.cameraType.frameRate, args.minUserDuration, args.aggMethods, args.aggCentiles)) for vs in videoSequences] + for job in jobs: + data.extend(job.get()) + pool.close() + data = pd.DataFrame(data, columns = headers) if args.output == 'figure': for name in headers[4:]: plt.ioff() plt.figure() - plt.boxplot([data.loc[data['sites']==siteId, name] for siteId in siteIds], labels = [session.query(Site).get(siteId).name for siteId in siteIds]) + plt.boxplot([data.loc[data['site']==site.name, name] for site in sites], labels = [site.name for site in sites]) plt.ylabel(name+' Speeds (km/h)') plt.savefig(name.lower()+'-speeds.png', dpi=dpi) plt.close() elif args.output == 'event': - data.to_csv('speeds.csv', index = False) -if args.analyze == 'interaction': + data.to_csv(args.eventFilename, index = False) + +if args.analyze == 'interaction': # redo as for object, export in dataframe all interaction data indicatorIds = [2,5,7,10] conversionFactors = {2: 1., 5: 30.*3.6, 7:1./30, 10:1./30} maxIndicatorValue = {2: float('inf'), 5: float('inf'), 7:10., 10:10.} @@ -282,3 +306,38 @@ plt.ylabel(events.Interaction.indicatorNames[i]+' ('+events.Interaction.indicatorUnits[i]+')') plt.savefig(events.Interaction.indicatorNames[i]+'.png', dpi=150) plt.close() + +if args.analyze == 'event': # aggregate event data by 15 min interval (args.intervalDuration), count events with thresholds + data = pd.read_csv(args.eventFilename, parse_dates = [2]) + #data = pd.read_csv('./speeds.csv', converters = {'time': lambda s: datetime.datetime.strptime(s, "%H:%M:%S").time()}, nrows = 5000) + # create time for end of each 15 min, then group by, using the agg method for each data column + headers = ['site', 'date', 'intervalend15', 'duration', 'count'] + aggFunctions, tmpheaders = utils.aggregationMethods(args.aggMethods, args.aggCentiles) + dataColumns = list(data.columns[4:]) + for h in dataColumns: + for h2 in tmpheaders: + headers.append(h+'-'+h2) + for h in dataColumns: + for t in args.eventThresholds: + headers.append('n-{}-{}'.format(h, t)) + data['intervalend15'] = data.time.apply(lambda t: (pd.Timestamp(year = t.year, month = t.month, day = t.day,hour = t.hour, minute = (t.minute // args.intervalDuration)*args.intervalDuration)+pd.Timedelta(minutes = 15)).time()) + outputData = [] + for name, group in data.groupby(['site', 'date', 'intervalend15']): + row = [] + row.extend(name) + groupStartTime = group.time.min() + groupEndTime = group.time.max() + row.append((groupEndTime.minute+1-groupStartTime.minute) % 60)#(name[2].minute*60+name[2].second-groupStartTime.minute*60+groupStartTime.second) % 3600) + row.append(len(group)) + for h in dataColumns: + for method,func in aggFunctions.items(): + aggregated = func(group[h]) + if method == 'centile': + row.extend(aggregated) + else: + row.append(aggregated) + for h in dataColumns: + for t in args.eventThresholds: + row.append((group[h] > t).sum()) + outputData.append(row) + pd.DataFrame(outputData, columns = headers).to_csv(utils.removeExtension(args.eventFilename)+'-aggregated.csv', index = False)
