Example 01: Daq, and multiprocessing pool ------------------------------------------ This is actually an early version of `undaq.py` The multiprocessing is easier to follow in this version before all the bells and whistles were attached. :: from __future__ import print_function import os import glob import time import multiprocessing import undaqTools # define a function to convert a daq to hdf5 def convert_daq(daq_file): """ converts a daqfile to hdf5 Parameters ---------- daq_file : string relative path to daq_file Returns ------- elapsed_time : float returns the time it took to complete converting daq or -1 if the conversion fails. """ t0 = time.time() # both multiprocessing and ipython cluster mask Exceptions # This way we can at least identify what file fails and the batch # processing continues # # Discovered this is needed the hard way. During an experimental trial # our MiniSim ran out of harddrive space and the resulting Daq failed to # load. try: daq = undaqTools.Daq() daq.read(daq_file) daq.write_hd5(daq_file.replace('.daq', '.hdf5')) del daq return time.time()-t0 except: return -1 if __name__ == '__main__': # data is on a local SSD drive. This is very important for performance. data_dir = 'C:\\LocalData\\Left Lane\\' # change the directory of the kernel print("Changing wd to '%s"%data_dir) os.chdir(data_dir) # specifies whether to convert all the daqs or only the ones that haven't # been created. Unless you muck with the Daq or DynObj classes there it # should be fine to leave this False rebuild_all = False # The clients may be RAM limited. In this example the machine has 8 cores # but we are only using 6 to convert the daq files (with this particular # dataset) (Machine has 32GB of memory and daqfiles are ~ 1.5 GB each, # memory peaks at about 29 GB with no other applications.) numcpus = 6 # parallel worker pool pool = multiprocessing.Pool(numcpus) # find all hdf5 files and all the daq files # we don't want to convert the daq files unless we have to # # data is organized such that every participant has his or her # own folder. Casting as tuples isn't strictly necessary. But this way # this ensures they are immutable. hd5_files = tuple(glob.glob('*/*.hdf5')) daq_files = tuple(glob.glob('*/*.daq')) # need to build list of daq_files to convert to pass to convert_daq if rebuild_all: daqs2convert = daq_files else: daqs2convert = \ [daq for daq in daq_files if daq[:-3]+'hdf5' not in hd5_files] # ready to roll. print('\nConverting daqs (this may take awhile)...') t0 = time.time() # start global time clock # this launches the batch processing of the daq files times = pool.imap(convert_daq, daqs2convert) # this provides feedback as the sets of files complete. Using imap # guarentees that the times are in the same order as daqs2convert but # delays receiving feedback for i, elapsed_time in enumerate(times): print(' {:<43}{:.1f} s'.format(daqs2convert[i], elapsed_time)) elapsed_time = time.time() - t0 + 1e-6 # so throughput calc doesn't bomb # when daq2convert is empty # close multiprocessing pool pool.close() pool.join() Example Output:: Changing wd to './ Converting daqs (this may take awhile)... pid01\Alaska_0_20130301142422.daq 59.3 s pid02\data reduction_20130204125617.daq 4.8 s pid03\Left_11_20130429102407.daq 45.7 s