Source code for vectorspace

import sys  
import copy
import time as T
import numpy as N

import util
from parallel import parallel_default_instance
_parallel = parallel_default_instance
import vectors as V


class VectorSpaceMatrices(object):
[docs] """Inner products and linear combinations with matrices. Kwargs: ``inner_product_weights``: 1D array or matrix of inner product weights. It corresponds to :math:`W` in inner product :math:`v_1^* W v_2`. """ def __init__(self, weights=None): self.weights = weights if self.weights is not None: self.weights = N.array(self.weights).squeeze() if self.weights is None: VectorSpaceMatrices.compute_inner_product_mat = \ VectorSpaceMatrices._IP_no_weights elif self.weights.ndim == 1: VectorSpaceMatrices.compute_inner_product_mat = \ VectorSpaceMatrices._IP_1D_weights elif self.weights.ndim == 2: self.weights = N.mat(self.weights) VectorSpaceMatrices.compute_inner_product_mat = \ VectorSpaceMatrices._IP_2D_weights else: raise ValueError('Weights must be None, 1D, or 2D') def _IP_no_weights(self, vecs1, vecs2): return N.mat(vecs1).H * N.mat(vecs2) def _IP_1D_weights(self, vecs1, vecs2): return N.mat((N.array(vecs1).conj().T * self.weights).dot(vecs2)) def _IP_2D_weights(self, vecs1, vecs2): return N.mat(vecs1).H * self.weights * N.mat(vecs2) def __eq__(self, other): if type(other) == type(self): return smart_eq(self.weights, other.weights) else: return False def lin_combine(self, basis_vecs, coeff_mat, coeff_mat_col_indices=None): return N.mat(basis_vecs) * N.mat(coeff_mat[:,coeff_mat_col_indices]) def compute_symmetric_inner_product_mat(self, vecs): return self.compute_inner_product_mat(vecs, vecs) def __eq__(self, other): if type(self) != type(other): return False return util.smart_eq(self.weights, other.weights) def __ne__(self, other): return not self.__eq__(other) class VectorSpaceHandles(object):
[docs] """Responsible for performing addition and multiplication in parallel. Kwargs: ``inner_product``: Inner product function. ``max_vecs_per_node``: Max number of vecs in memory per node. ``verbosity``: 1 prints progress and warnings, 0 prints almost nothing. ``print_interval``: Min time (secs) between printed progress messages. The class implements parallelized vector addition and scalar multiplication and is used in high-level classes in :py:mod:`pod`, :py:mod:`bpod`, :py:mod:`dmd` and :py:mod:`ltigalerkinproj`. Note: Computations are often sped up by using all available processors, even if this lowers ``max_vecs_per_node`` proportionally. However, this depends on the computer and the nature of the functions supplied, and sometimes loading from file is slower with more processors. """ def __init__(self, inner_product=None, max_vecs_per_node=None, verbosity=1, print_interval=10): """Constructor.""" self.inner_product = inner_product self.verbosity = verbosity self.print_interval = print_interval self.prev_print_time = 0. if max_vecs_per_node is None: self.max_vecs_per_node = 10000 # different default? self.print_msg('Warning: max_vecs_per_node was not specified. ' 'Assuming %d vecs can be in memory per node. Decrease ' 'max_vecs_per_node if memory errors.'%self.max_vecs_per_node) else: self.max_vecs_per_node = max_vecs_per_node if self.max_vecs_per_node < \ 2 * _parallel.get_num_procs() / _parallel.get_num_nodes(): self.max_vecs_per_proc = 2 self.print_msg('Warning: max_vecs_per_node too small for given ' 'number of nodes and procs. Assuming 2 vecs can be ' 'in memory per processor. If possible, increase ' 'max_vecs_per_node for a speedup.') else: self.max_vecs_per_proc = self.max_vecs_per_node * \ _parallel.get_num_nodes()/_parallel.get_num_procs() def _check_inner_product(self): """Check that ``inner_product`` is defined""" if self.inner_product is None: raise RuntimeError('inner product function is not defined') def print_msg(self, msg, output_channel=sys.stdout):
[docs] """Print a message from rank 0.""" if self.verbosity > 0 and _parallel.is_rank_zero(): print >> output_channel, msg def sanity_check(self, test_vec_handle):
[docs] """Check user-supplied vec handle and vec objects. Args: ``test_vec_handle``: A vector handle. The add and mult functions are tested for the vector object. This is not a complete testing, but catches some common mistakes. Raises an error if a check fails. TODO: Other things which could be tested: ``get``/``put`` doesn't effect other vecs (memory problems) """ self._check_inner_product() tol = 1e-10 test_vec = test_vec_handle.get() vec_copy = copy.deepcopy(test_vec) vec_copy_mag2 = self.inner_product(vec_copy, vec_copy) factor = 2. vec_mult = test_vec * factor if abs(self.inner_product(vec_mult, vec_mult) - vec_copy_mag2 * factor**2) > tol: raise ValueError('Multiplication of vec/mode failed') if abs(self.inner_product(test_vec, test_vec) - vec_copy_mag2) > tol: raise ValueError('Original vec modified by multiplication!') vec_add = test_vec + test_vec if abs(self.inner_product(vec_add, vec_add) - vec_copy_mag2 * 4) > tol: raise ValueError('Addition does not give correct result') if abs(self.inner_product(test_vec, test_vec) - vec_copy_mag2) > tol: raise ValueError('Original vec modified by addition!') vec_add_mult = test_vec * factor + test_vec if abs(self.inner_product(vec_add_mult, vec_add_mult) - vec_copy_mag2 * (factor + 1) ** 2) > tol: raise ValueError('Multiplication and addition of vec/mode are '+\ 'inconsistent') if abs(self.inner_product(test_vec, test_vec) - vec_copy_mag2) > tol: raise ValueError('Original vec modified by combo of mult/add!') #vecSub = 3.5*test_vec - test_vec #N.testing.assert_array_almost_equal(vecSub,2.5*test_vec) #N.testing.assert_array_almost_equal(test_vec,vec_copy) self.print_msg('Passed the sanity check') def compute_inner_product_mat(self, row_vec_handles, col_vec_handles):
[docs] """Computes the matrix of inner product combinations between vectors. Args: ``row_vec_handles``: List of row vector handles. For example BPOD adjoints, :math:`Y`. ``col_vec_handles``: List of column vector handles. For example BPOD directs, :math:`X`. Returns: ``IP_mat``: 2D array of inner products. The vecs are retrieved in memory-efficient chunks and are not all in memory at once. The row vecs and col vecs are assumed to be different. When they are the same, use :py:meth:`compute_symmetric_inner_product` for a 2x speedup. Each MPI worker (processor) is responsible for retrieving a subset of the rows and columns. The processors then send/recv columns via MPI so they can be used to compute all IPs for the rows on each MPI worker. This is repeated until all MPI workers are done with all of their row chunks. If there are 2 processors:: | x o | rank0 | x o | | x o | - | o x | rank1 | o x | | o x | In the next step, rank 0 sends column 0 to rank 1 and rank 1 sends column 1 to rank 0. The remaining IPs are filled in:: | x x | rank0 | x x | | x x | - | x x | rank1 | x x | | x x | When the number of cols and rows is not divisible by the number of processors, the processors are assigned unequal numbers of tasks. However, all processors are always part of the passing cycle. The scaling is: - num gets / processor ~ :math:`(n_r*n_c/((max-2)*n_p*n_p)) + n_r/n_p` - num MPI sends / processor ~ :math:`(n_p-1)*(n_r/((max-2)*n_p))*n_c/n_p` - num inner products / processor ~ :math:`n_r*n_c/n_p` where :math:`n_r` is number of rows, :math:`n_c` number of columns, :math:`max` is ``max_vecs_per_proc = max_vecs_per_node/num_procs_per_node``, and :math:`n_p` is the number of MPI workers (processors). If there are more rows than columns, then an internal transpose and un-transpose is performed to improve efficiency (since :math:`n_c` only appears in the scaling in the quadratic term). """ self._check_inner_product() row_vec_handles = util.make_list(row_vec_handles) col_vec_handles = util.make_list(col_vec_handles) num_cols = len(col_vec_handles) num_rows = len(row_vec_handles) if num_rows > num_cols: transpose = True temp = row_vec_handles row_vec_handles = col_vec_handles col_vec_handles = temp temp = num_rows num_rows = num_cols num_cols = temp else: transpose = False # convenience rank = _parallel.get_rank() ## Old way that worked # num_cols_per_proc_chunk is the number of cols each proc gets at once num_cols_per_proc_chunk = 1 num_rows_per_proc_chunk = self.max_vecs_per_proc - \ num_cols_per_proc_chunk # Determine how the retrieving and inner products will be split up. row_tasks = _parallel.find_assignments(range(num_rows)) col_tasks = _parallel.find_assignments(range(num_cols)) # Find max number of col tasks among all processors max_num_row_tasks = max([len(tasks) for tasks in row_tasks]) max_num_col_tasks = max([len(tasks) for tasks in col_tasks]) ## New way #if self.max_vecs_per_node > max_num_row_tasks: # num_cols_per_proc_chunk = #num_rows_per_proc_chunk = self.max_vecs_per_proc - \ # num_cols_per_proc_chunk # These variables are the number of iters through loops that retrieve # ("get") row and column vecs. num_row_get_loops = \ int(N.ceil(max_num_row_tasks*1./num_rows_per_proc_chunk)) num_col_get_loops = \ int(N.ceil(max_num_col_tasks*1./num_cols_per_proc_chunk)) if num_row_get_loops > 1: self.print_msg('Warning: The column vecs, of which ' 'there are %d, will be retrieved %d times each. Increase ' 'number of nodes or max_vecs_per_node to reduce redundant ' '"get"s for a speedup.'%(num_cols, num_row_get_loops)) # Estimate the time this will take and determine matrix datatype # (real or complex). row_vec = row_vec_handles[0].get() col_vec = col_vec_handles[0].get() # Burn the first, it sometimes contains slow imports IP_burn = self.inner_product(row_vec, col_vec) start_time = T.time() row_vec = row_vec_handles[0].get() get_time = T.time() - start_time start_time = T.time() IP = self.inner_product(row_vec, col_vec) IP_time = T.time() - start_time IP_type = type(IP) total_IP_time = (num_rows * num_cols * IP_time / _parallel.get_num_procs()) vecs_per_proc = self.max_vecs_per_node * _parallel.get_num_nodes() / \ _parallel.get_num_procs() num_gets = (num_rows*num_cols) / ((vecs_per_proc-2) * _parallel.get_num_procs()**2) + num_rows/_parallel.get_num_procs() total_get_time = num_gets * get_time self.print_msg('Computing the inner product matrix will take at least ' '%.1f minutes' % ((total_IP_time + total_get_time) / 60.)) del row_vec, col_vec # To find all of the inner product mat chunks, each # processor has a full IP_mat with size # num_rows x num_cols even though each processor is not responsible for # filling in all of these entries. After each proc fills in what it is # responsible for, the other entries remain 0's. Then, an allreduce # is done and all the IP_mats are summed. This is simpler # concatenating chunks of the IPmats. # The efficiency is not an issue, the size of the mats # are small compared to the size of the vecs for large data. IP_mat = N.mat(N.zeros((num_rows, num_cols), dtype=IP_type)) for row_get_index in xrange(num_row_get_loops): if len(row_tasks[rank]) > 0: start_row_index = min(row_tasks[rank][0] + row_get_index*num_rows_per_proc_chunk, row_tasks[rank][-1]+1) end_row_index = min(row_tasks[rank][-1]+1, start_row_index + num_rows_per_proc_chunk) row_vecs = [row_vec_handle.get() for row_vec_handle in row_vec_handles[start_row_index:end_row_index]] else: row_vecs = [] for col_get_index in xrange(num_col_get_loops): if len(col_tasks[rank]) > 0: start_col_index = min(col_tasks[rank][0] + col_get_index*num_cols_per_proc_chunk, col_tasks[rank][-1]+1) end_col_index = min(col_tasks[rank][-1]+1, start_col_index + num_cols_per_proc_chunk) else: start_col_index = 0 end_col_index = 0 # Cycle the col vecs to proc with rank -> mod(rank+1,num_procs) # Must do this for each processor, until data makes a circle col_vecs_recv = (None, None) col_indices = range(start_col_index, end_col_index) for pass_index in xrange(_parallel.get_num_procs()): #if rank==0: print 'starting pass index=',pass_index # If on the first pass, get the col vecs, no send/recv # This is all that is called when in serial, loop iterates # once. if pass_index == 0: col_vecs = [col_handle.get() for col_handle in col_vec_handles[start_col_index: end_col_index]] else: # Determine with whom to communicate dest = (rank + 1) % _parallel.get_num_procs() source = (rank - 1)%_parallel.get_num_procs() # Create unique tag based on send/recv ranks send_tag = rank * \ (_parallel.get_num_procs() + 1) + dest recv_tag = source * \ (_parallel.get_num_procs() + 1) + rank # Collect data and send/receive col_vecs_send = (col_vecs, col_indices) request = _parallel.comm.isend( col_vecs_send, dest=dest, tag=send_tag) col_vecs_recv = _parallel.comm.recv( source=source, tag=recv_tag) request.Wait() _parallel.barrier() col_indices = col_vecs_recv[1] col_vecs = col_vecs_recv[0] # Compute the IPs for this set of data col_indices stores # the indices of the IP_mat columns to be # filled in. if len(row_vecs) > 0: for row_index in xrange(start_row_index, end_row_index): for col_vec_index, col_vec in enumerate(col_vecs): IP_mat[row_index, col_indices[ col_vec_index]] = self.inner_product( row_vecs[row_index - start_row_index], col_vec) if (T.time() - self.prev_print_time) > \ self.print_interval: num_completed_IPs = (N.abs(IP_mat)>0).sum() percent_completed_IPs = (100. * num_completed_IPs* _parallel.get_num_MPI_workers()) / ( num_cols*num_rows) self.print_msg(('Completed %.1f%% of inner ' + 'products')%percent_completed_IPs, sys.stderr) self.prev_print_time = T.time() # Clear the retrieved column vecs after done this pass cycle del col_vecs # Completed a chunk of rows and all columns on all processors. del row_vecs # Assign these chunks into IP_mat. if _parallel.is_distributed(): IP_mat = _parallel.custom_comm.allreduce(IP_mat) if transpose: IP_mat = IP_mat.T percent_completed_IPs = 100. self.print_msg(('Completed %.1f%% of inner ' + 'products')%percent_completed_IPs, sys.stderr) self.prev_print_time = T.time() _parallel.barrier() return IP_mat def compute_symmetric_inner_product_mat(self, vec_handles):
[docs] """Computes an upper-triangular symmetric matrix of inner products. Args: ``vec_handles``: List of vector handles. Returns: ``IP_mat``: Numpy array of inner products. See the documentation for :py:meth:`compute_inner_product_mat` for an idea how this works. TODO: JON, write detailed documentation similar to :py:meth:`compute_inner_product_mat`. """ self._check_inner_product() vec_handles = util.make_list(vec_handles) num_vecs = len(vec_handles) # num_cols_per_chunk is the number of cols each proc gets at once. # Columns are retrieved if the matrix must be broken up into sets of # chunks. Then symmetric upper triangular portions will be computed, # followed by a rectangular piece that uses columns not already in # memory. num_cols_per_proc_chunk = 1 num_rows_per_proc_chunk = self.max_vecs_per_proc -\ num_cols_per_proc_chunk # <nprocs> chunks are computed simulaneously, making up a set. num_cols_per_chunk = num_cols_per_proc_chunk * _parallel.get_num_procs() num_rows_per_chunk = num_rows_per_proc_chunk * _parallel.get_num_procs() # <num_row_chunks> is the number of sets that must be computed. num_row_chunks = int(N.ceil(num_vecs * 1. / num_rows_per_chunk)) if num_row_chunks > 1: self.print_msg('Warning: The vecs, of which ' 'there are %d, will be retrieved %d times each. Increase ' 'number of nodes or max_vecs_per_node to reduce redundant ' '"get"s for a speedup.'%(num_vecs,num_row_chunks)) # Estimate the time this will take and determine matrix datatype # (real or complex). test_vec = vec_handles[0].get() # Burn the first, it sometimes contains slow imports IP_burn = self.inner_product(test_vec, test_vec) start_time = T.time() test_vec = vec_handles[0].get() get_time = T.time() - start_time start_time = T.time() IP = self.inner_product(test_vec, test_vec) IP_time = T.time() - start_time IP_type = type(IP) total_IP_time = (num_vecs**2 * IP_time / 2. / _parallel.get_num_procs()) vecs_per_proc = self.max_vecs_per_node * _parallel.get_num_nodes() / \ _parallel.get_num_procs() num_gets = (num_vecs**2 /2.) / ((vecs_per_proc-2) * _parallel.get_num_procs()**2) + \ num_vecs/_parallel.get_num_procs()/2. total_get_time = num_gets * get_time self.print_msg('Computing the inner product matrix will take at least ' '%.1f minutes' % ((total_IP_time + total_get_time) / 60.)) del test_vec # Use the same trick as in compute_IP_mat, having each proc # fill in elements of a num_rows x num_rows sized matrix, rather than # assembling small chunks. This is done for the triangular portions. # For the rectangular portions, the inner product mat is filled # in directly. IP_mat = N.mat(N.zeros((num_vecs, num_vecs), dtype=IP_type)) for start_row_index in xrange(0, num_vecs, num_rows_per_chunk): end_row_index = min(num_vecs, start_row_index + num_rows_per_chunk) proc_row_tasks_all = _parallel.find_assignments(range( start_row_index, end_row_index)) num_active_procs = len([task for task in \ proc_row_tasks_all if task != []]) proc_row_tasks = proc_row_tasks_all[_parallel.get_rank()] if len(proc_row_tasks)!=0: row_vecs = [vec_handle.get() for vec_handle in vec_handles[ proc_row_tasks[0]:proc_row_tasks[-1] + 1]] else: row_vecs = [] # Triangular chunks if len(proc_row_tasks) > 0: # Test that indices are consecutive if proc_row_tasks[0:] != range(proc_row_tasks[0], proc_row_tasks[-1] + 1): raise ValueError('Indices are not consecutive.') # Per-processor triangles (using only vecs in memory) for row_index in xrange(proc_row_tasks[0], proc_row_tasks[-1] + 1): # Diagonal term IP_mat[row_index, row_index] = self.\ inner_product(row_vecs[row_index - proc_row_tasks[ 0]], row_vecs[row_index - proc_row_tasks[0]]) # Off-diagonal terms for col_index in xrange(row_index + 1, proc_row_tasks[ -1] + 1): IP_mat[row_index, col_index] = self.\ inner_product(row_vecs[row_index -\ proc_row_tasks[0]], row_vecs[col_index -\ proc_row_tasks[0]]) # Number of square chunks to fill in is n * (n-1) / 2. At each # iteration we fill in n of them, so we need (n-1) / 2 # iterations (round up). for set_index in xrange(int(N.ceil((num_active_procs - 1.) / 2))): # The current proc is "sender" my_rank = _parallel.get_rank() my_row_indices = proc_row_tasks my_num_rows = len(my_row_indices) # The proc to send to is "destination" dest_rank = (my_rank + set_index + 1) % num_active_procs # This is unused? #dest_row_indices = proc_row_tasks_all[dest_rank] # The proc that data is received from is the "source" source_rank = (my_rank - set_index - 1) % num_active_procs # Find the maximum number of sends/recv to be done by any proc max_num_to_send = int(N.ceil(1. * max([len(tasks) for \ tasks in proc_row_tasks_all]) /\ num_cols_per_proc_chunk)) """ # Pad tasks with nan so that everyone has the same # number of things to send. Same for list of vecs with None. # The empty lists will not do anything when enumerated, so no # inner products will be taken. nan is inserted into the # indices because then min/max of the indices can be taken. if my_num_rows != len(row_vecs): raise ValueError('Number of rows assigned does not ' +\ 'match number of vecs in memory.') if my_num_rows > 0 and my_num_rows < max_num_to_send: my_row_indices += [N.nan] * (max_num_to_send - my_num_rows) row_vecs += [[]] * (max_num_to_send - my_num_rows) """ for send_index in xrange(max_num_to_send): # Only processors responsible for rows communicate if my_num_rows > 0: # Send row vecs, in groups of num_cols_per_proc_chunk # These become columns in the ensuing computation start_col_index = send_index * num_cols_per_proc_chunk end_col_index = min(start_col_index + num_cols_per_proc_chunk, my_num_rows) col_vecs_send = ( row_vecs[start_col_index:end_col_index], my_row_indices[start_col_index:end_col_index]) # Create unique tags based on ranks send_tag = my_rank * ( _parallel.get_num_procs() + 1) + dest_rank recv_tag = source_rank * ( _parallel.get_num_procs() + 1) + my_rank # Send and receieve data. The Wait() command after the # receive prevents a race condition not fixed by sync(). # The Wait() is very important for the non- # blocking send (though we are unsure why). request = _parallel.comm.isend(col_vecs_send, dest=dest_rank, tag=send_tag) col_vecs_recv = _parallel.comm.recv(source = source_rank, tag=recv_tag) request.Wait() col_vecs = col_vecs_recv[0] my_col_indices = col_vecs_recv[1] for row_index in xrange(my_row_indices[0], my_row_indices[-1] + 1): for col_vec_index, col_vec in enumerate(col_vecs): IP_mat[row_index, my_col_indices[ col_vec_index]] = self.inner_product( row_vecs[row_index - my_row_indices[0]], col_vec) if (T.time() - self.prev_print_time) > \ self.print_interval: num_completed_IPs = (N.abs(IP_mat)>0).sum() percent_completed_IPs = \ (100.*2*num_completed_IPs * \ _parallel.get_num_MPI_workers())/\ (num_vecs**2) self.print_msg(('Completed %.1f%% of inner ' + 'products')%percent_completed_IPs, sys.stderr) self.prev_print_time = T.time() # Sync after send/receive _parallel.barrier() # Fill in the rectangular portion next to each triangle (if nec.). # Start at index after last row, continue to last column. This part # of the code is the same as in compute_IP_mat, as of # revision 141. for start_col_index in xrange(end_row_index, num_vecs, num_cols_per_chunk): end_col_index = min(start_col_index + num_cols_per_chunk, num_vecs) proc_col_tasks = _parallel.find_assignments(range( start_col_index, end_col_index))[_parallel.get_rank()] # Pass the col vecs to proc with rank -> mod(rank+1,numProcs) # Must do this for each processor, until data makes a circle col_vecs_recv = (None, None) if len(proc_col_tasks) > 0: col_indices = range(proc_col_tasks[0], proc_col_tasks[-1]+1) else: col_indices = [] for num_passes in xrange(_parallel.get_num_procs()): # If on the first pass, get the col vecs, no send/recv # This is all that is called when in serial, loop iterates # once. if num_passes == 0: if len(col_indices) > 0: col_vecs = [col_handle.get() \ for col_handle in vec_handles[col_indices[0]:\ col_indices[-1] + 1]] else: col_vecs = [] else: # Determine whom to communicate with dest = (_parallel.get_rank() + 1) % _parallel.\ get_num_procs() source = (_parallel.get_rank() - 1) % _parallel.\ get_num_procs() # Create unique tag based on ranks send_tag = _parallel.get_rank() * (_parallel.\ get_num_procs() + 1) + dest recv_tag = source*(_parallel.get_num_procs() + 1) +\ _parallel.get_rank() # Collect data and send/receive col_vecs_send = (col_vecs, col_indices) request = _parallel.comm.isend(col_vecs_send, dest=\ dest, tag=send_tag) col_vecs_recv = _parallel.comm.recv(source=source, tag=recv_tag) request.Wait() _parallel.barrier() col_indices = col_vecs_recv[1] col_vecs = col_vecs_recv[0] # Compute the IPs for this set of data col_indices stores # the indices of the IP_mat columns to be # filled in. if len(proc_row_tasks) > 0: for row_index in xrange(proc_row_tasks[0], proc_row_tasks[-1]+1): for col_vec_index, col_vec in enumerate(col_vecs): IP_mat[row_index, col_indices[ col_vec_index]] = self.inner_product( row_vecs[row_index - proc_row_tasks[0]], col_vec) if (T.time() - self.prev_print_time) > self.print_interval: num_completed_IPs = (N.abs(IP_mat)>0).sum() percent_completed_IPs = (100.*2*num_completed_IPs * _parallel.get_num_MPI_workers())/(num_vecs**2) self.print_msg(('Completed %.1f%% of inner ' + 'products')%percent_completed_IPs, sys.stderr) self.prev_print_time = T.time() # Completed a chunk of rows and all columns on all processors. # Finished row_vecs loop, delete memory used del row_vecs # Assign the triangular portion chunks into IP_mat. if _parallel.is_distributed(): IP_mat = _parallel.custom_comm.allreduce(IP_mat) # Create a mask for the repeated values. Select values that are zero # in the upper triangular portion (not computed there) but nonzero in # the lower triangular portion (computed there). For the case where # the inner product is not perfectly symmetric, this will select the # computation done in the upper triangular portion. mask = N.multiply(IP_mat == 0, IP_mat.T != 0) # Collect values below diagonal IP_mat += N.multiply(N.triu(IP_mat.T, 1), mask) # Symmetrize matrix IP_mat = N.triu(IP_mat) + N.triu(IP_mat, 1).T percent_completed_IPs = 100. self.print_msg(('Completed %.1f%% of inner ' + 'products')%percent_completed_IPs, sys.stderr) self.prev_print_time = T.time() _parallel.barrier() return IP_mat def lin_combine(self, sum_vec_handles, basis_vec_handles, coeff_mat,
[docs] coeff_mat_col_indices=None): """Linearly combines the basis vecs and calls ``put`` on result. Args: ``sum_vec_handles``: List of handles for the sum vectors. ``basis_vec_handles``: List of handles for the basis vecs. ``coeff_mat``: Matrix with rows corresponding to a basis vecs and columns to sum (lin. comb.) vecs. The rows and columns correspond, by index, to the lists basis_vec_handles and sum_vec_handles. ``sums = basis * coeff_mat`` Kwargs: ``coeff_mat_col_indices``: List of column indices. The sum_vecs corresponding to these col indices are computed. Each processor retrieves a subset of the basis vecs to compute as many outputs as a processor can have in memory at once. Each processor computes the "layers" from the basis it is resonsible for, and for as many modes as it can fit in memory. The layers from all procs are summed together to form the sum_vecs and ``put`` ed. Scaling is: num gets/worker = :math:`n_s/(n_p*(max-2)) * n_b/n_p` passes/worker = :math:`(n_p-1) * n_s/(n_p*(max-2)) * (n_b/n_p)` scalar multiplies/worker = :math:`n_s*n_b/n_p` Where :math:`n_s` is number of sum vecs, :math:`n_b` is number of basis vecs, :math:`n_p` is number of processors, :math:`max` = ``max_vecs_per_node``. """ sum_vec_handles = util.make_list(sum_vec_handles) basis_vec_handles = util.make_list(basis_vec_handles) num_bases = len(basis_vec_handles) num_sums = len(sum_vec_handles) if coeff_mat_col_indices is not None: coeff_mat = coeff_mat[:, coeff_mat_col_indices] if num_bases != coeff_mat.shape[0]: raise ValueError(('Number of coeff_mat rows (%d) does not equal ' 'number of basis handles (%d)'%(coeff_mat.shape[0],num_bases))) if num_sums != coeff_mat.shape[1]: raise ValueError(('Number of coeff_mat cols (%d) does not equal ' 'number of output handles (%d)')%(coeff_mat.shape[1],num_sums)) # Estimate time it will take # Burn the first one for slow imports test_vec_burn = basis_vec_handles[0].get() test_vec_burn_3 = test_vec_burn + 2.*test_vec_burn del test_vec_burn, test_vec_burn_3 start_time = T.time() test_vec = basis_vec_handles[0].get() get_time = T.time() - start_time start_time = T.time() test_vec_3 = test_vec + 2.*test_vec add_scale_time = T.time() - start_time del test_vec, test_vec_3 vecs_per_worker = self.max_vecs_per_node * _parallel.get_num_nodes() / \ _parallel.get_num_MPI_workers() num_gets = num_sums/(_parallel.get_num_MPI_workers()*(\ vecs_per_worker-2)) + \ num_bases/_parallel.get_num_MPI_workers() num_add_scales = num_sums*num_bases/_parallel.get_num_MPI_workers() self.print_msg('Linear combinations will take at least %.1f minutes'% (num_gets*get_time/60. + num_add_scales*add_scale_time/60.)) # convenience rank = _parallel.get_rank() # num_bases_per_proc_chunk is the num of bases each proc gets at once. num_bases_per_proc_chunk = 1 num_sums_per_proc_chunk = self.max_vecs_per_proc - \ num_bases_per_proc_chunk basis_tasks = _parallel.find_assignments(range(num_bases)) sum_tasks = _parallel.find_assignments(range(num_sums)) # Find max number tasks among all processors max_num_basis_tasks = max([len(tasks) for tasks in basis_tasks]) max_num_sum_tasks = max([len(tasks) for tasks in sum_tasks]) # These variables are the number of iters through loops that retrieve # ("get") # and "put" basis and sum vecs. num_basis_get_iters = int(N.ceil( max_num_basis_tasks*1./num_bases_per_proc_chunk)) num_sum_put_iters = int(N.ceil( max_num_sum_tasks*1./num_sums_per_proc_chunk)) if num_sum_put_iters > 1: self.print_msg('Warning: The basis vecs, ' 'of which there are %d, will be retrieved %d times each. ' 'If possible, increase number of nodes or ' 'max_vecs_per_node to reduce redundant retrieves and get a ' 'big speedup.'%(num_bases, num_sum_put_iters)) for sum_put_index in xrange(num_sum_put_iters): if len(sum_tasks[rank]) > 0: start_sum_index = min(sum_tasks[rank][0] + sum_put_index*num_sums_per_proc_chunk, sum_tasks[rank][-1]+1) end_sum_index = min(start_sum_index+num_sums_per_proc_chunk, sum_tasks[rank][-1]+1) # Create empty list on each processor sum_layers = [None]*(end_sum_index - start_sum_index) else: start_sum_index = 0 end_sum_index = 0 sum_layers = [] for basis_get_index in xrange(num_basis_get_iters): if len(basis_tasks[rank]) > 0: start_basis_index = min(basis_tasks[rank][0] + basis_get_index*num_bases_per_proc_chunk, basis_tasks[rank][-1]+1) end_basis_index = min(start_basis_index + num_bases_per_proc_chunk, basis_tasks[rank][-1]+1) basis_indices = range(start_basis_index, end_basis_index) else: basis_indices = [] # Pass the basis vecs to proc with rank -> mod(rank+1,num_procs) # Must do this for each processor, until data makes a circle basis_vecs_recv = (None, None) for pass_index in xrange(_parallel.get_num_procs()): # If on the first pass, retrieve the basis vecs, # no send/recv. # This is all that is called when in serial, # loop iterates once. if pass_index == 0: if len(basis_indices) > 0: basis_vecs = [basis_handle.get() \ for basis_handle in basis_vec_handles[ basis_indices[0]:basis_indices[-1]+1]] else: basis_vecs = [] else: # Figure out with whom to communicate source = (_parallel.get_rank()-1) % \ _parallel.get_num_procs() dest = (_parallel.get_rank()+1) % \ _parallel.get_num_procs() #Create unique tags based on ranks send_tag = _parallel.get_rank() * \ (_parallel.get_num_procs()+1) + dest recv_tag = source*(_parallel.get_num_procs()+1) + \ _parallel.get_rank() # Send/receive data basis_vecs_send = (basis_vecs, basis_indices) request = _parallel.comm.isend(basis_vecs_send, dest=dest, tag=send_tag) basis_vecs_recv = _parallel.comm.recv( source=source, tag=recv_tag) request.Wait() _parallel.barrier() basis_indices = basis_vecs_recv[1] basis_vecs = basis_vecs_recv[0] # Compute the scalar multiplications for this set of data. # basis_indices stores the indices of the coeff_mat to # use. for sum_index in xrange(start_sum_index, end_sum_index): for basis_index, basis_vec in enumerate(basis_vecs): sum_layer = basis_vec * \ coeff_mat[basis_indices[basis_index],\ sum_index] if sum_layers[sum_index-start_sum_index] is None: sum_layers[sum_index-start_sum_index] = \ sum_layer else: sum_layers[sum_index-start_sum_index] += \ sum_layer if (T.time() - self.prev_print_time) > self.print_interval: self.print_msg( 'Completed %.1f%% of linear combinations' % (sum_index*100./len(sum_tasks[rank]))) self.prev_print_time = T.time() # Completed this set of sum vecs, puts them to memory or file for sum_index in xrange(start_sum_index, end_sum_index): sum_vec_handles[sum_index].put( sum_layers[sum_index-start_sum_index]) del sum_layers self.print_msg('Completed %.1f%% of linear combinations' % 100.) self.prev_print_time = T.time() _parallel.barrier() def __eq__(self, other):
if type(self) != type(other): return False return (self.inner_product == other.inner_product and self.verbosity == other.verbosity) def __ne__(self, other): return not self.__eq__(other)