Example 01: Daq, and multiprocessing poolΒΆ

This is actually an early version of undaq.py The multiprocessing is easier to follow in this version before all the bells and whistles were attached.

from __future__ import print_function

import os
import glob
import time
import multiprocessing

import undaqTools

# define a function to convert a daq to hdf5
def convert_daq(daq_file):
    """
    converts a daqfile to hdf5

    Parameters
    ----------
    daq_file : string
        relative path to daq_file

    Returns
    -------
    elapsed_time : float
        returns the time it took to complete converting daq or -1 if the
        conversion fails.

    """
    t0 = time.time()

    # both multiprocessing and ipython cluster mask Exceptions
    # This way we can at least identify what file fails and the batch
    # processing continues
    #
    # Discovered this is needed the hard way. During an experimental trial
    # our MiniSim ran out of harddrive space and the resulting Daq failed to
    # load.
    try:
        daq = undaqTools.Daq()
        daq.read(daq_file)
        daq.write_hd5(daq_file.replace('.daq', '.hdf5'))
        del daq
        return time.time()-t0
    except:
        return -1

if __name__ == '__main__':

    # data is on a local SSD drive. This is very important for performance.
    data_dir = 'C:\\LocalData\\Left Lane\\'

    # change the directory of the kernel
    print("Changing wd to '%s"%data_dir)
    os.chdir(data_dir)

    # specifies whether to convert all the daqs or only the ones that haven't
    # been created. Unless you muck with the Daq or DynObj classes there it
    # should be fine to leave this False
    rebuild_all = False

    # The clients may be RAM limited. In this example the machine has 8 cores
    # but we are only using 6 to convert the daq files (with this particular
    # dataset) (Machine has 32GB of memory and daqfiles are ~ 1.5 GB each,
    # memory peaks at about 29 GB with no other applications.)
    numcpus = 6

    # parallel worker pool
    pool = multiprocessing.Pool(numcpus)

    # find all hdf5 files and all the daq files
    # we don't want to convert the daq files unless we have to
    #
    # data is organized such that every participant has his or her
    # own folder. Casting as tuples isn't strictly necessary. But this way
    # this ensures they are immutable.
    hd5_files = tuple(glob.glob('*/*.hdf5'))
    daq_files = tuple(glob.glob('*/*.daq'))

    # need to build list of daq_files to convert to pass to convert_daq
    if rebuild_all:
        daqs2convert = daq_files
    else:
        daqs2convert = \
            [daq for daq in daq_files if daq[:-3]+'hdf5' not in hd5_files]

    # ready to roll.
    print('\nConverting daqs (this may take awhile)...')
    t0 = time.time() # start global time clock

    # this launches the batch processing of the daq files
    times = pool.imap(convert_daq, daqs2convert)

    # this provides feedback as the sets of files complete. Using imap
    # guarentees that the times are in the same order as daqs2convert but
    # delays receiving feedback
    for i, elapsed_time in enumerate(times):
        print('    {:<43}{:.1f} s'.format(daqs2convert[i], elapsed_time))

    elapsed_time = time.time() - t0 + 1e-6 # so throughput calc doesn't bomb
                                           # when daq2convert is empty

    # close multiprocessing pool
    pool.close()
    pool.join()

Example Output:

Changing wd to './

Converting daqs (this may take awhile)...
    pid01\Alaska_0_20130301142422.daq            59.3 s
    pid02\data reduction_20130204125617.daq      4.8 s
    pid03\Left_11_20130429102407.daq             45.7 s

Previous topic

Examples Overview

Next topic

Example 02: stat, logstream, and Daq.etc

This Page