diff scripts/process.py @ 1077:3939ae415be0

Merging
author Wendlasida
date Fri, 20 Jul 2018 14:03:34 -0400
parents 0154133e77df
children 58994b08be42
line wrap: on
line diff
--- a/scripts/process.py	Fri Jul 20 13:50:43 2018 -0400
+++ b/scripts/process.py	Fri Jul 20 14:03:34 2018 -0400
@@ -8,7 +8,7 @@
 #atplotlib.use('Agg')
 import matplotlib.pyplot as plt
 import numpy as np
-from pandas import DataFrame
+import pandas as pd
 
 from trafficintelligence import storage, events, prediction, cvutils, utils, moving, processing, ml
 from trafficintelligence.metadata import *
@@ -16,14 +16,15 @@
 parser = argparse.ArgumentParser(description='This program manages the processing of several files based on a description of the sites and video data in an SQLite database following the metadata module.')
 # input
 parser.add_argument('--db', dest = 'metadataFilename', help = 'name of the metadata file', required = True)
-parser.add_argument('--videos', dest = 'videoIds', help = 'indices of the video sequences', nargs = '*', type = int)
+parser.add_argument('--videos', dest = 'videoIds', help = 'indices of the video sequences', nargs = '*')
 parser.add_argument('--sites', dest = 'siteIds', help = 'indices of the video sequences', nargs = '*')
 
 # main function
 parser.add_argument('--delete', dest = 'delete', help = 'data to delete', choices = ['feature', 'object', 'classification', 'interaction'])
 parser.add_argument('--process', dest = 'process', help = 'data to process', choices = ['feature', 'object', 'classification', 'prototype', 'interaction'])
 parser.add_argument('--display', dest = 'display', help = 'data to display (replay over video)', choices = ['feature', 'object', 'classification', 'interaction'])
-parser.add_argument('--analyze', dest = 'analyze', help = 'data to analyze (results)', choices = ['feature', 'object', 'classification', 'interaction'])
+parser.add_argument('--progress', dest = 'progress', help = 'information about the progress of processing', action = 'store_true')
+parser.add_argument('--analyze', dest = 'analyze', help = 'data to analyze (results)', choices = ['feature', 'object', 'classification', 'interaction', 'event'])
 
 # common options
 parser.add_argument('--cfg', dest = 'configFilename', help = 'name of the configuration file')
@@ -60,11 +61,13 @@
 # analysis options
 parser.add_argument('--output', dest = 'output', help = 'kind of output to produce (interval means)', choices = ['figure', 'interval', 'event'])
 parser.add_argument('--min-user-duration', dest = 'minUserDuration', help = 'mininum duration we have to see the user to take into account in the analysis (s)', type = float, default = 0.1)
-parser.add_argument('--interval-duration', dest = 'intervalDuration', help = 'length of time interval to aggregate data (min)', type = float, default = 15.)
-parser.add_argument('--aggregation', dest = 'aggMethod', help = 'aggregation method per user/event and per interval', choices = ['mean', 'median', 'centile'], nargs = '*', default = ['median'])
-parser.add_argument('--aggregation-centile', dest = 'aggCentiles', help = 'centile(s) to compute from the observations', nargs = '*', type = int)
+parser.add_argument('--interval-duration', dest = 'intervalDuration', help = 'length of time interval to aggregate data (min)', type = int, default = 15)
+parser.add_argument('--aggregation', dest = 'aggMethods', help = 'aggregation method per user/interaction and per interval', choices = ['mean', 'median', 'centile'], nargs = '*', default = ['median'])
+parser.add_argument('--aggregation-centiles', dest = 'aggCentiles', help = 'centile(s) to compute from the observations', nargs = '*', type = int)
+parser.add_argument('--event-thresholds', dest = 'eventThresholds', help = 'threshold to count severe situations', nargs = '*', type = float)
+parser.add_argument('--event-filename', dest = 'eventFilename', help = 'filename of the event data')
 dpi = 150
-# unit of analysis: site or video sequence?
+# unit of analysis: site - camera-view
 
 # need way of selecting sites as similar as possible to sql alchemy syntax
 # override tracking.cfg from db
@@ -82,16 +85,22 @@
 videoSequences = []
 sites = []
 if args.videoIds is not None:
-    videoSequences = [session.query(VideoSequence).get(videoId) for videoId in args.videoIds]
-    siteIds = set([vs.cameraView.siteIdx for vs in videoSequences])
+    for videoId in args.videoIds:
+        if '-' in videoId:
+            videoSequences.extend([session.query(VideoSequence).get(i) for i in moving.TimeInterval.parse(videoId)])
+        else:
+            videoSequences.append(session.query(VideoSequence).get(int(videoId)))
+    videoSequences = [vs for vs in videoSequences if vs is not None]
+    sites = set([vs.cameraView.site for vs in videoSequences])
 elif args.siteIds is not None:
-    siteIds = set(args.siteIds)
-    for siteId in siteIds:
-        tmpsites = getSite(session, siteId)
-        sites.extend(tmpsites)
-        for site in tmpsites:
-            for cv in site.cameraViews:
-                videoSequences.extend(cv.videoSequences)
+    for siteId in args.siteIds:
+        if '-' in siteId:
+            sites.extend([session.query(Site).get(i) for i in moving.TimeInterval.parse(siteId)])
+        else:
+            sites.append(session.query(Site).get(int(siteId)))
+    sites = [s for s in sites if s is not None]
+    for site in sites:
+        videoSequences.extend(getSiteVideoSequences(site))
 else:
     print('No video/site to process')
 
@@ -99,6 +108,34 @@
     pool = Pool(args.nProcesses)
 
 #################################
+# Report progress in the processing
+#################################
+if args.progress:
+    print('Providing information on data progress')
+    headers = ['site', 'vs', 'features', 'objects', 'interactions'] # todo add prototypes and object classification
+    data = []
+    for site in sites:
+        unprocessedVideoSequences = []
+        for vs in getSiteVideoSequences(site):
+            if (parentPath/vs.getDatabaseFilename()).is_file(): # TODO check time of file?
+                tableNames = storage.tableNames(str(parentPath.absolute()/vs.getDatabaseFilename()))
+                data.append([site.name, vs.idx, 'positions' in tableNames, 'objects' in tableNames, 'interactions' in tableNames])
+            else:
+                unprocessedVideoSequences.append(vs)
+                data.append([site.name, vs.idx, False, False, False])
+        #if len(unprocessedVideoSequences):
+        #    print('Site {} ({}) has {} completely unprocessed video sequences'.format (site.name, site.idx, len(unprocessedVideoSequences)))
+    data = pd.DataFrame(data, columns = headers)
+    print('-'*80)
+    print('\t'+' '.join(headers[2:]))
+    print('-'*80)
+    for name, group in data.groupby(['site']): #.agg({'vs': 'count'}))
+        n = group.vs.count()
+        print('{}: {} % / {} % / {} % ({})'.format(name, 100*group.features.sum()/float(n), 100*group.objects.sum()/float(n), 100*group.interactions.sum()/float(n), n))
+    print('-'*80)
+    print(data)
+
+#################################
 # Delete
 #################################
 if args.delete is not None:
@@ -119,20 +156,20 @@
 if args.process in ['feature', 'object']: # tracking
     if args.nProcesses == 1:
         for vs in videoSequences:
-            if not (parentPath/vs.getDatabaseFilename()).exists() or args.process == 'object':
+            if not (parentPath/vs.getDatabaseFilename()).is_file() or args.process == 'object':
                 if args.configFilename is None:
                     configFilename = str(parentPath/vs.cameraView.getTrackingConfigurationFilename())
                 else:
                     configFilename = args.configFilename
                 if vs.cameraView.cameraType is None:
                     cvutils.tracking(configFilename, args.process == 'object', str(parentPath.absolute()/vs.getVideoSequenceFilename()), str(parentPath.absolute()/vs.getDatabaseFilename()), str(parentPath.absolute()/vs.cameraView.getHomographyFilename()), str(parentPath.absolute()/vs.cameraView.getMaskFilename()), False, None, None, args.dryRun)
-                else:
+                else: #caution: cameratype can be not none, but without parameters for undistortion
                     cvutils.tracking(configFilename, args.process == 'object', str(parentPath.absolute()/vs.getVideoSequenceFilename()), str(parentPath.absolute()/vs.getDatabaseFilename()), str(parentPath.absolute()/vs.cameraView.getHomographyFilename()), str(parentPath.absolute()/vs.cameraView.getMaskFilename()), True, vs.cameraView.cameraType.intrinsicCameraMatrix, vs.cameraView.cameraType.distortionCoefficients, args.dryRun)
             else:
                 print('SQLite already exists: {}'.format(parentPath/vs.getDatabaseFilename()))
     else:
         for vs in videoSequences:
-            if not (parentPath/vs.getDatabaseFilename()).exists() or args.process == 'object':
+            if not (parentPath/vs.getDatabaseFilename()).is_file() or args.process == 'object':
                 if args.configFilename is None:
                     configFilename = str(parentPath/vs.cameraView.getTrackingConfigurationFilename())
                 else:
@@ -147,7 +184,7 @@
         pool.join()
 
 elif args.process == 'prototype': # motion pattern learning
-    # learn by site by default -> group videos by site (or by camera view? TODO add cameraviews)
+    # learn by site by default -> group videos by camera view TODO
     # by default, load all objects, learn and then assign (BUT not save the assignments)
     for site in sites:
         print('Learning motion patterns for site {} ({})'.format(site.idx, site.name))
@@ -177,7 +214,6 @@
             outputPrototypeDatabaseFilename = args.databaseFilename
         else:
             outputPrototypeDatabaseFilename = args.outputPrototypeDatabaseFilename
-        # TODO maintain mapping from object prototype to db filename + compute nmatchings before
         clusterSizes = ml.computeClusterSizes(labels, prototypeIndices, -1)
         storage.savePrototypesToSqlite(str(parentPath/site.getPath()/outputPrototypeDatabaseFilename), [moving.Prototype(object2VideoSequences[trainingObjects[i]].getDatabaseFilename(False), trainingObjects[i].getNum(), prototypeType, clusterSizes[i]) for i in prototypeIndices])
 
@@ -213,46 +249,34 @@
 if args.analyze == 'object':
     # user speeds, accelerations
     # aggregation per site
+    if args.eventFilename is None:
+        print('Missing output filename (event-filename). Exiting')
+        sys.exit(0)
     data = [] # list of observation per site-user with time
-    headers = ['sites', 'date', 'time', 'user_type']
-    aggFunctions = {}
-    for method in args.aggMethod:
-        if method == 'centile':
-            aggFunctions[method] = utils.aggregationFunction(method, args.aggCentiles)
-            for c in args.aggCentiles:
-                headers.append('{}{}'.format(method,c))
-        else:
-            aggFunctions[method] = utils.aggregationFunction(method)
-            headers.append(method)
-    for vs in videoSequences:
-        d = vs.startTime.date()
-        t1 = vs.startTime.time()
-        minUserDuration = args.minUserDuration*vs.cameraView.cameraType.frameRate
-        print('Extracting speed from '+vs.getDatabaseFilename())
-        objects = storage.loadTrajectoriesFromSqlite(str(parentPath/vs.getDatabaseFilename()), 'object', args.nObjects)
-        for o in objects:
-            if o.length() > minUserDuration:
-                row = [vs.cameraView.siteIdx, d, utils.framesToTime(o.getFirstInstant(), vs.cameraView.cameraType.frameRate, t1), o.getUserType()]
-                tmp = o.getSpeeds()
-                for method,func in aggFunctions.items():
-                    aggSpeeds = vs.cameraView.cameraType.frameRate*3.6*func(tmp)
-                    if method == 'centile':
-                        row += aggSpeeds.tolist()
-                    else:
-                        row.append(aggSpeeds)
-            data.append(row)
-    data = DataFrame(data, columns = headers)
+    headers = ['site', 'date', 'time', 'user_type']
+    aggFunctions, tmpheaders = utils.aggregationMethods(args.aggMethods, args.aggCentiles)
+    headers.extend(tmpheaders)
+    if args.nProcesses == 1:
+        for vs in videoSequences:
+            data.extend(processing.extractVideoSequenceSpeeds(str(parentPath/vs.getDatabaseFilename()), vs.cameraView.site.name, args.nObjects, vs.startTime, vs.cameraView.cameraType.frameRate, args.minUserDuration, args.aggMethods, args.aggCentiles))
+    else:
+        jobs = [pool.apply_async(processing.extractVideoSequenceSpeeds, args = (str(parentPath/vs.getDatabaseFilename()), vs.cameraView.site.name, args.nObjects, vs.startTime, vs.cameraView.cameraType.frameRate, args.minUserDuration, args.aggMethods, args.aggCentiles)) for vs in videoSequences]
+        for job in jobs:
+            data.extend(job.get())
+        pool.close()
+    data = pd.DataFrame(data, columns = headers)
     if args.output == 'figure':
         for name in headers[4:]:
             plt.ioff()
             plt.figure()
-            plt.boxplot([data.loc[data['sites']==siteId, name] for siteId in siteIds], labels = [session.query(Site).get(siteId).name for siteId in siteIds])
+            plt.boxplot([data.loc[data['site']==site.name, name] for site in sites], labels = [site.name for site in sites])
             plt.ylabel(name+' Speeds (km/h)')
             plt.savefig(name.lower()+'-speeds.png', dpi=dpi)
             plt.close()
     elif args.output == 'event':
-        data.to_csv('speeds.csv', index = False)
-if args.analyze == 'interaction':
+        data.to_csv(args.eventFilename, index = False)
+
+if args.analyze == 'interaction': # redo as for object, export in dataframe all interaction data
     indicatorIds = [2,5,7,10]
     conversionFactors = {2: 1., 5: 30.*3.6, 7:1./30, 10:1./30}
     maxIndicatorValue = {2: float('inf'), 5: float('inf'), 7:10., 10:10.}
@@ -282,3 +306,38 @@
         plt.ylabel(events.Interaction.indicatorNames[i]+' ('+events.Interaction.indicatorUnits[i]+')')
         plt.savefig(events.Interaction.indicatorNames[i]+'.png', dpi=150)
         plt.close()
+
+if args.analyze == 'event': # aggregate event data by 15 min interval (args.intervalDuration), count events with thresholds
+    data = pd.read_csv(args.eventFilename, parse_dates = [2])
+    #data = pd.read_csv('./speeds.csv', converters = {'time': lambda s: datetime.datetime.strptime(s, "%H:%M:%S").time()}, nrows = 5000)
+    # create time for end of each 15 min, then group by, using the agg method for each data column
+    headers = ['site', 'date', 'intervalend15', 'duration', 'count']
+    aggFunctions, tmpheaders = utils.aggregationMethods(args.aggMethods, args.aggCentiles)
+    dataColumns = list(data.columns[4:])
+    for h in dataColumns:
+        for h2 in tmpheaders:
+            headers.append(h+'-'+h2)
+    for h in dataColumns:
+        for t in args.eventThresholds:
+            headers.append('n-{}-{}'.format(h, t))
+    data['intervalend15'] = data.time.apply(lambda t: (pd.Timestamp(year = t.year, month = t.month, day = t.day,hour = t.hour, minute = (t.minute // args.intervalDuration)*args.intervalDuration)+pd.Timedelta(minutes = 15)).time())
+    outputData = []
+    for name, group in data.groupby(['site', 'date', 'intervalend15']):
+        row = []
+        row.extend(name)
+        groupStartTime = group.time.min()
+        groupEndTime = group.time.max()
+        row.append((groupEndTime.minute+1-groupStartTime.minute) % 60)#(name[2].minute*60+name[2].second-groupStartTime.minute*60+groupStartTime.second) % 3600)
+        row.append(len(group))
+        for h in dataColumns:
+            for method,func in aggFunctions.items():
+                aggregated = func(group[h])
+                if method == 'centile':
+                    row.extend(aggregated)
+                else:
+                    row.append(aggregated)
+        for h in dataColumns:
+            for t in args.eventThresholds:
+                row.append((group[h] > t).sum())
+        outputData.append(row)
+    pd.DataFrame(outputData, columns = headers).to_csv(utils.removeExtension(args.eventFilename)+'-aggregated.csv', index = False)