Example 03: Collaborate timeseries measuresΒΆ

Walks through directory structure and collaborates timeseries based on metadata in the etc dictionaries and writes the aggregated data to a csv file.

By using a multiprocessing pool and specifying an elemlist to read_hd5 the datafiles can be quickly traversed.

from __future__ import print_function

# Copyright (c) 2013, Roger Lew
# All rights reserved.

"""
Example timeseries reduction
"""

from collections import OrderedDict
import glob
import os
import multiprocessing
import time

import numpy as np

from pyvttbl import DataFrame

from undaqTools import Daq

# dependent variables and indices that we want to analyze
dvs = [('CFS_Accelerator_Pedal_Position', 0),
       ('CFS_Brake_Pedal_Force', 0),
       ('CFS_Steering_Wheel_Angle', 0),
       ('SCC_Lane_Deviation', 1),
       ('VDS_Veh_Speed', 0)]

elemlist = [elem for elem,indx in dvs]

def collaborate_timeseries(hd5_file):
    global dvs, elemlist

    # load hd5
    daq = Daq()
    daq.read_hd5(hd5_file, elemlist=elemlist)

    results_list = []
    for (epoch, fslice) in daq.etc['epochs'].items():

        # figure out pid and independent variable conditions
        pid = daq.etc['pid']
        trial = epoch / 10
        scenario = daq.etc['scen_order'][trial]
        section = epoch % 10

        # pack pid and IV conditions into OrderedDict
        row = OrderedDict([('pid', pid),
                           ('trial', trial),
                           ('scenario', scenario),
                           ('section', section)])

        for (dv, indx) in dvs:
            vec = daq[dv][indx,fslice].flatten()

            row['%s_mean'%dv] = np.mean(vec)
            row['%s_min'%dv] = np.min(vec)
            row['%s_max'%dv] = np.max(vec)
            row['%s_range'%dv] = row['%s_max'%dv] - row['%s_min'%dv]
            row['%s_amean'%dv] = np.mean(np.abs(vec))
            row['%s_sd'%dv] = np.std(vec)
            row['%s_rms'%dv] = np.linalg.norm(vec)/np.sqrt(len(vec))

        results_list.append(row)

    return results_list


if __name__ == '__main__':

    # data is on a local SSD drive.
    data_dir = 'C:\\LocalData\\Left Lane\\'

    # change the directory of the kernel
    print("Changing wd to '%s'"%data_dir)
    os.chdir(data_dir)

    print('\nCollaborating timeseries measures...')
    t0 = time.time()
    hd5_files = tuple(glob.glob('*/*.hdf5'))

    # parallel worker pool
    hd5_files = tuple(glob.glob('*/*.hdf5'))
    numcpus = 8
    pool = multiprocessing.Pool(numcpus)
    list_of_results_lists = pool.imap(collaborate_timeseries, hd5_files)

    # pyvttbl is in pypi
    # container to hold the collaborated results
    df = DataFrame()
    for i, result_list in enumerate(list_of_results_lists):
        print('%s analyzed.'%hd5_files[i])
        for row in result_list:
            df.insert(row)

    df.write(fname='collaborated_ts.csv')

    print('\nDone.\n\ncollaborating timeseries measures took %.1f s'%(time.time()-t0))

Example Output:

Changing wd to 'C:\LocalData\Left Lane\'

Collaborating timeseries measures...
Part01\Left_01_20130424102744.hdf5 analyzed.
Part02\Left_02_20130425084730.hdf5 analyzed.
Part03\Left_03_20130425102301.hdf5 analyzed.
Part04\Left_04_20130425142804.hdf5 analyzed.
Part05\Left_05_20130425161122.hdf5 analyzed.
Part06\Left_06_20130426111502.hdf5 analyzed.
Part07\Left_07_20130426143846.hdf5 analyzed.
Part08\Left_08_20130426164114.hdf5 analyzed.
Part08\Left_08_20130426164301.hdf5 analyzed.
Part09\Left09_20130423155149.hdf5 analyzed.
Part10\Left10_20130423155149.hdf5 analyzed.
Part111\Left_11_20130430081052.hdf5 analyzed.
Part12\Left_12_20130429163745.hdf5 analyzed.
Part13\Left_13_20130429182923.hdf5 analyzed.
Part14\Left_14_20130430102504.hdf5 analyzed.
Part15\Left_15_20130430171947.hdf5 analyzed.
Part16\Left_16_20130501103917.hdf5 analyzed.
Part170\Left_17_20130501163745.hdf5 analyzed.
Part18\Left_18_20130502084422.hdf5 analyzed.
Part18\Left_18_reset_20130502090909.hdf5 analyzed.
Part19\Left_19_20130502153547.hdf5 analyzed.
Part200\Left_20_20130509094509.hdf5 analyzed.

Done.

collaborating timeseries measures took 18.0 s

Traversing 30 GB of data in 18 seconds with 100 lines of code is a pretty impressive feat if you ask me.

Previous topic

Example 02: stat, logstream, and Daq.etc

Next topic

Example 04: Multipanel ensemble plot

This Page