Walks through directory structure and collaborates timeseries based on metadata in the etc dictionaries and writes the aggregated data to a csv file.
By using a multiprocessing pool and specifying an elemlist to read_hd5 the datafiles can be quickly traversed.
from __future__ import print_function
# Copyright (c) 2013, Roger Lew
# All rights reserved.
"""
Example timeseries reduction
"""
from collections import OrderedDict
import glob
import os
import multiprocessing
import time
import numpy as np
from pyvttbl import DataFrame
from undaqTools import Daq
# dependent variables and indices that we want to analyze
dvs = [('CFS_Accelerator_Pedal_Position', 0),
('CFS_Brake_Pedal_Force', 0),
('CFS_Steering_Wheel_Angle', 0),
('SCC_Lane_Deviation', 1),
('VDS_Veh_Speed', 0)]
elemlist = [elem for elem,indx in dvs]
def collaborate_timeseries(hd5_file):
global dvs, elemlist
# load hd5
daq = Daq()
daq.read_hd5(hd5_file, elemlist=elemlist)
results_list = []
for (epoch, fslice) in daq.etc['epochs'].items():
# figure out pid and independent variable conditions
pid = daq.etc['pid']
trial = epoch / 10
scenario = daq.etc['scen_order'][trial]
section = epoch % 10
# pack pid and IV conditions into OrderedDict
row = OrderedDict([('pid', pid),
('trial', trial),
('scenario', scenario),
('section', section)])
for (dv, indx) in dvs:
vec = daq[dv][indx,fslice].flatten()
row['%s_mean'%dv] = np.mean(vec)
row['%s_min'%dv] = np.min(vec)
row['%s_max'%dv] = np.max(vec)
row['%s_range'%dv] = row['%s_max'%dv] - row['%s_min'%dv]
row['%s_amean'%dv] = np.mean(np.abs(vec))
row['%s_sd'%dv] = np.std(vec)
row['%s_rms'%dv] = np.linalg.norm(vec)/np.sqrt(len(vec))
results_list.append(row)
return results_list
if __name__ == '__main__':
# data is on a local SSD drive.
data_dir = 'C:\\LocalData\\Left Lane\\'
# change the directory of the kernel
print("Changing wd to '%s'"%data_dir)
os.chdir(data_dir)
print('\nCollaborating timeseries measures...')
t0 = time.time()
hd5_files = tuple(glob.glob('*/*.hdf5'))
# parallel worker pool
hd5_files = tuple(glob.glob('*/*.hdf5'))
numcpus = 8
pool = multiprocessing.Pool(numcpus)
list_of_results_lists = pool.imap(collaborate_timeseries, hd5_files)
# pyvttbl is in pypi
# container to hold the collaborated results
df = DataFrame()
for i, result_list in enumerate(list_of_results_lists):
print('%s analyzed.'%hd5_files[i])
for row in result_list:
df.insert(row)
df.write(fname='collaborated_ts.csv')
print('\nDone.\n\ncollaborating timeseries measures took %.1f s'%(time.time()-t0))
Example Output:
Changing wd to 'C:\LocalData\Left Lane\'
Collaborating timeseries measures...
Part01\Left_01_20130424102744.hdf5 analyzed.
Part02\Left_02_20130425084730.hdf5 analyzed.
Part03\Left_03_20130425102301.hdf5 analyzed.
Part04\Left_04_20130425142804.hdf5 analyzed.
Part05\Left_05_20130425161122.hdf5 analyzed.
Part06\Left_06_20130426111502.hdf5 analyzed.
Part07\Left_07_20130426143846.hdf5 analyzed.
Part08\Left_08_20130426164114.hdf5 analyzed.
Part08\Left_08_20130426164301.hdf5 analyzed.
Part09\Left09_20130423155149.hdf5 analyzed.
Part10\Left10_20130423155149.hdf5 analyzed.
Part111\Left_11_20130430081052.hdf5 analyzed.
Part12\Left_12_20130429163745.hdf5 analyzed.
Part13\Left_13_20130429182923.hdf5 analyzed.
Part14\Left_14_20130430102504.hdf5 analyzed.
Part15\Left_15_20130430171947.hdf5 analyzed.
Part16\Left_16_20130501103917.hdf5 analyzed.
Part170\Left_17_20130501163745.hdf5 analyzed.
Part18\Left_18_20130502084422.hdf5 analyzed.
Part18\Left_18_reset_20130502090909.hdf5 analyzed.
Part19\Left_19_20130502153547.hdf5 analyzed.
Part200\Left_20_20130509094509.hdf5 analyzed.
Done.
collaborating timeseries measures took 18.0 s
Traversing 30 GB of data in 18 seconds with 100 lines of code is a pretty impressive feat if you ask me.