Source code for pysph.tools.interpolator

# Standard library imports
from functools import reduce

# Library imports.
import numpy as np

# Package imports.
from pysph.base.utils import get_particle_array
from pysph.base.kernels import Gaussian
from pysph.base.nnps import LinkedListNNPS as NNPS
from pysph.sph.equation import Equation
from pysph.sph.acceleration_eval import AccelerationEval
from pysph.sph.sph_compiler import SPHCompiler


[docs]class InterpolateFunction(Equation):
[docs] def initialize(self, d_idx, d_prop, d_number_density): d_prop[d_idx] = 0.0 d_number_density[d_idx] = 0.0
[docs] def loop(self, s_idx, d_idx, s_temp_prop, d_prop, d_number_density, WIJ): d_number_density[d_idx] += WIJ d_prop[d_idx] += WIJ*s_temp_prop[s_idx]
[docs] def post_loop(self, d_idx, d_prop, d_number_density): if d_number_density[d_idx] > 1e-12: d_prop[d_idx] /= d_number_density[d_idx]
[docs]def get_bounding_box(particle_arrays, tight=False, stretch=0.05): """Find the size of the domain given a sequence of particle arrays. If `tight` is True, the bounds are tight, if not the domain is stretched along each dimension by an amount `stretch` specified as a percentage of the length along that dimension is added in each dimension. """ xmin, xmax = 1e20, -1e20 ymin, ymax = 1e20, -1e20 zmin, zmax = 1e20, -1e20 for pa in particle_arrays: x, y, z = pa.x, pa.y, pa.z xmin = min(xmin, x.min()) xmax = max(xmax, x.max()) ymin = min(ymin, y.min()) ymax = max(ymax, y.max()) zmin = min(zmin, z.min()) zmax = max(zmax, z.max()) bounds = np.asarray((xmin, xmax, ymin, ymax, zmin, zmax)) if not tight: # Add the extra space. lengths = stretch*np.repeat(bounds[1::2] - bounds[::2], 2) lengths[::2] *= -1.0 bounds += lengths return bounds
[docs]def get_nx_ny_nz(num_points, bounds): """Given a number of points to use and the bounds, return a triplet of integers for a uniform mesh with approximately that many points. """ bounds = np.asarray(bounds, dtype=float) length = bounds[1::2] - bounds[::2] total_length = length.sum() rel_length = length/total_length non_zero = rel_length > 1e-3 dim = int(non_zero.sum()) volume = np.prod(length[non_zero]) delta = pow(volume/num_points, 1.0/dim) dimensions = np.ones(3, dtype=int) for i in range(3): if rel_length[i] > 1e-4: dimensions[i] = int(round(length[i]/delta)) return dimensions
[docs]class Interpolator(object): """Convenient class to interpolate particle properties onto a uniform grid or given set of particles. This is particularly handy for visualization. """ def __init__(self, particle_arrays, num_points=125000, kernel=None, x=None, y=None, z=None, domain_manager=None, equations=None): """ The x, y, z coordinates need not be specified, and if they are not, the bounds of the interpolated domain is automatically computed and `num_points` number of points are used in this domain uniformly placed. Parameters ---------- particle_arrays: list A list of particle arrays. num_points: int the number of points to interpolate on to. kernel: Kernel the kernel to use for interpolation. x: ndarray the x-coordinate of points on which to interpolate. y: ndarray the y-coordinate of points on which to interpolate. z: ndarray the z-coordinate of points on which to interpolate. domain_manager: DomainManager An optional Domain manager for periodic domains. equations: sequence A sequence of equations or groups. Defaults to None. This is used only if the default interpolation equations are inadequate. """ self._set_particle_arrays(particle_arrays) bounds = get_bounding_box(self.particle_arrays) shape = get_nx_ny_nz(num_points, bounds) self.dim = 3 - list(shape).count(1) if kernel is None: self.kernel = Gaussian(dim=self.dim) else: self.kernel = kernel self.pa = None self.nnps = None self.equations = equations self.func_eval = None self.domain_manager = domain_manager if x is None and y is None and z is None: self.set_domain(bounds, shape) else: self.set_interpolation_points(x=x, y=y, z=z) #### Interpolator protocol ################################################
[docs] def set_interpolation_points(self, x=None, y=None, z=None): """Set the points on which we must interpolate the arrays. If any of x, y, z is not passed it is assumed to be 0.0 and shaped like the other non-None arrays. Parameters ---------- x: ndarray the x-coordinate of points on which to interpolate. y: ndarray the y-coordinate of points on which to interpolate. z: ndarray the z-coordinate of points on which to interpolate. """ tmp = None for tmp in (x, y, z): if tmp is not None: break if tmp is None: raise RuntimeError('At least one non-None array must be given.') def _get_array(_t): return np.asarray(_t) if _t is not None else np.zeros_like(tmp) x, y, z = _get_array(x), _get_array(y), _get_array(z) self.shape = x.shape self.pa = self._create_particle_array(x, y, z) arrays = self.particle_arrays + [self.pa] if self.func_eval is None: self._compile_acceleration_eval(arrays) self.update_particle_arrays(self.particle_arrays)
[docs] def set_domain(self, bounds, shape): """Set the domain to interpolate into. Parameters ---------- bounds: tuple (xmin, xmax, ymin, ymax, zmin, zmax) shape: tuple (nx, ny, nz) """ self.bounds = np.asarray(bounds) self.shape = np.asarray(shape) x, y, z = self._create_default_points(self.bounds, self.shape) self.set_interpolation_points(x, y, z)
[docs] def interpolate(self, prop, gradient=False): """Interpolate given property. Parameters ---------- prop: str The name of the property to interpolate. gradient: bool Evaluate gradient and not function. Returns ------- A numpy array suitably shaped with the property interpolated. """ for array in self.particle_arrays: data = array.get(prop, only_real_particles=False) array.get('temp_prop', only_real_particles=False)[:] = data self.func_eval.compute(0.0, 0.1) # These are junk arguments. result = self.pa.prop.copy() result.shape = self.shape return result.squeeze()
[docs] def update_particle_arrays(self, particle_arrays): """Call this for a new set of particle arrays which have the same properties as before. For example, if you are reading the particle array data from files, each time you load a new file a new particle array is read with the same properties. Call this function to reset the arrays. """ self._set_particle_arrays(particle_arrays) arrays = self.particle_arrays + [self.pa] self._create_nnps(arrays) self.func_eval.update_particle_arrays(arrays)
#### Private protocol ##################################################### def _create_nnps(self, arrays): # create the neighbor locator object self.nnps = NNPS(dim=self.kernel.dim, particles=arrays, radius_scale=self.kernel.radius_scale, domain=self.domain_manager, cache=True) self.nnps.update() self.func_eval.set_nnps(self.nnps) def _create_default_points(self, bounds, shape): b = bounds n = shape x, y, z = np.mgrid[b[0]:b[1]:n[0]*1j, b[2]:b[3]:n[1]*1j, b[4]:b[5]:n[2]*1j, ] return x, y, z def _create_particle_array(self, x, y, z): xr = x.ravel() yr = y.ravel() zr = z.ravel() self.x, self.y, self.z = x.squeeze(), y.squeeze(), z.squeeze() hmax = self._get_max_h_in_arrays() h = hmax*np.ones_like(xr) prop = np.zeros_like(xr) pa = get_particle_array( name='interpolate', x=xr, y=yr, z=zr, h=h, number_density=np.zeros_like(xr), prop=prop, grad_x=np.zeros_like(xr), grad_y=np.zeros_like(xr), grad_z=np.zeros_like(xr) ) return pa def _compile_acceleration_eval(self, arrays): names = [x.name for x in self.particle_arrays] if self.equations is None: equations = [InterpolateFunction(dest='interpolate', sources=names)] else: equations = self.equations self.func_eval = AccelerationEval(arrays, equations, self.kernel) compiler = SPHCompiler(self.func_eval, None) compiler.compile() def _get_max_h_in_arrays(self): hmax = -1.0 for array in self.particle_arrays: hmax = max(array.h.max(), hmax) return hmax def _set_particle_arrays(self, particle_arrays): self.particle_arrays = particle_arrays self._make_all_arrays_have_same_props(particle_arrays) for array in self.particle_arrays: if 'temp_prop' not in array.properties: array.add_property('temp_prop') def _make_all_arrays_have_same_props(self, particle_arrays): """Make sure all arrays have the same props. """ all_props = reduce( set.union, [set(x.properties.keys()) for x in particle_arrays] ) for array in particle_arrays: all_props.update(array.properties.keys()) for array in particle_arrays: array_props = set(array.properties.keys()) for prop in (all_props - array_props): array.add_property(prop)
[docs]def main(fname, prop, npoint): from pysph.solver.utils import load print("Loading", fname) data = load(fname) arrays = list(data['arrays'].values()) interp = Interpolator(arrays, num_points=npoint) print(interp.shape) print("Interpolating") prop = interp.interpolate(prop) print("Visualizing") from mayavi import mlab src = mlab.pipeline.scalar_field(interp.x, interp.y, interp.z, prop) if interp.dim == 3: mlab.pipeline.scalar_cut_plane(src) else: mlab.pipeline.surface(src) mlab.pipeline.outline(src) mlab.show()
if __name__ == '__main__': import sys if len(sys.argv) < 4: print("Usage: interpolator.py filename property num_points") sys.exit(1) else: main(sys.argv[1], sys.argv[2], int(sys.argv[3]))