Source code for pysph.solver.output

"""
An interface to output the data in various format
"""

import numpy
import os

from pysph.base.particle_array import ParticleArray
from pysph.base.utils import get_particles_info, get_particle_array
from pysph import has_h5py

output_formats = ('hdf5', 'npz')


class Output(object):
    """ Class that handles output for simulation """
    def __init__(self, detailed_output=False, only_real=True, mpi_comm=None,
                 compress=False):
        self.compress = compress
        self.detailed_output = detailed_output
        self.only_real = only_real
        self.mpi_comm = mpi_comm

    def dump(self, fname, particles, solver_data):
        self.particle_data = dict(get_particles_info(particles))
        self.all_array_data = {}
        for array in particles:
            self.all_array_data[array.name] = array.get_property_arrays(
                all=self.detailed_output,
                only_real=self.only_real
                )
        mpi_comm = self.mpi_comm
        if mpi_comm is not None:
            self.all_array_data = self._gather_array_data(
                    self.all_array_data, mpi_comm
                    )
        self.solver_data = solver_data
        self._dump(fname)

    def load(self, fname):
        return self._load(fname)

    def _gather_array_data(self, all_array_data, comm):
        """Given array_data from the current processor and an MPI
        communicator,return a joined array_data from all processors
        on rank 0 and the same array_data on the other machines.
        """

        array_names = all_array_data.keys()

        # gather the data from all processors
        collected_data = comm.gather(all_array_data, root=0)

        if comm.Get_rank() == 0:
            all_array_data = {}
            size = comm.Get_size()

            # concatenate the arrays
            for array_name in array_names:
                array_data = {}
                all_array_data[array_name] = array_data

                _props = collected_data[0][array_name].keys()
                for prop in _props:
                    data = [collected_data[pid][array_name][prop]
                            for pid in range(size)]
                    prop_arr = numpy.concatenate(data)
                    array_data[prop] = prop_arr
        return all_array_data

    def _dump(self, fname):
        """ Implement the method for writing the output to a file here """
        raise NotImplementedError()

    def _load(self, fname):
        """ Implement the method for loading from file here """
        raise NotImplementedError()


class NumpyOutput(Output):

    def _dump(self, filename):
        save_method = numpy.savez_compressed if self.compress else numpy.savez
        output_data = {"particles": self.particle_data,
                       "solver_data": self.solver_data}
        for name, arrays in self.all_array_data.items():
            self.particle_data[name]["arrays"] = arrays
        save_method(filename, version=2, **output_data)

    def _load(self, fname):
        def _get_dict_from_arrays(arrays):
            arrays.shape = (1,)
            return arrays[0]
        data = numpy.load(fname)

        if 'version' not in data.files:
            msg = "Wrong file type! No version number recorded."
            raise RuntimeError(msg)
        ret = {}
        ret["arrays"] = {}
        version = data['version']
        solver_data = _get_dict_from_arrays(data["solver_data"])
        ret["solver_data"] = solver_data

        if version == 1:
            arrays = _get_dict_from_arrays(data["arrays"])
            for array_name in arrays:
                array = get_particle_array(name=array_name,
                                           **arrays[array_name])
                ret["arrays"][array_name] = array

        elif version == 2:
            particles = _get_dict_from_arrays(data["particles"])

            for array_name, array_info in particles.items():
                array = ParticleArray(name=array_name,
                                      constants=array_info["constants"],
                                      **array_info["arrays"])
                array.set_output_arrays(
                    array_info.get('output_property_arrays', [])
                )
                for prop, prop_info in array_info["properties"].items():
                    if prop not in array_info["arrays"]:
                        array.add_property(**prop_info)
                ret["arrays"][array_name] = array

        else:
            raise RuntimeError("Version not understood!")
        return ret


class HDFOutput(Output):

    def _dump(self, filename):
        import h5py
        with h5py.File(filename, 'w') as f:
            solver_grp = f.create_group('solver_data')
            particles_grp = f.create_group('particles')
            for ptype, pdata in self.particle_data.items():
                ptype_grp = particles_grp.create_group(ptype)
                arrays_grp = ptype_grp.create_group('arrays')
                data = self.all_array_data[ptype]
                self._set_constants(pdata, ptype_grp)
                self._set_properties(pdata, arrays_grp, data)
            self._set_solver_data(solver_grp)

    def _load(self, fname):
        if has_h5py():
            import h5py
        else:
            msg = "Install python-h5py to load this file"
            raise ImportError(msg)

        ret = {}
        with h5py.File(fname, 'r') as f:
            solver_grp = f['solver_data']
            particles_grp = f['particles']
            ret["solver_data"] = self._get_solver_data(solver_grp)
            ret["arrays"] = self._get_particles(particles_grp)
        return ret

    def _get_particles(self, grp):

        particles = {}
        for name, prop_array in grp.items():
            output_array = []
            const_grp = prop_array['constants']
            arrays_grp = prop_array['arrays']
            constants = self._get_constants(const_grp)
            array = ParticleArray(str(name), constants=constants)

            for pname, h5obj in arrays_grp.items():
                prop_name = str(h5obj.attrs['name'])
                type_ = str(h5obj.attrs['type'])
                default = h5obj.attrs['default']
                if h5obj.attrs['stored']:
                    output_array.append(str(pname))
                    array.add_property(
                            prop_name, type_, default, numpy.array(h5obj))
                else:
                    array.add_property(prop_name, type_)
            array.set_output_arrays(output_array)
            particles[str(name)] = array
        return particles

    def _get_solver_data(self, grp):
        solver_data = {}
        for name, value in grp.attrs.items():
            solver_data[str(name)] = value
        return solver_data

    def _get_constants(self, grp):
        constants = {}
        for const_name, const_data in grp.items():
            constants[str(const_name)] = numpy.array(const_data)
        return constants

    def _set_constants(self, pdata, ptype_grp):
        pconstants = pdata['constants']
        constGroup = ptype_grp.create_group('constants')
        for constName, constArray in pconstants.items():
            constGroup.create_dataset(constName, data=constArray)

    def _set_properties(self, pdata, ptype_grp, data):
        for propname, attributes in pdata['properties'].items():
            if propname in data:
                array = data[propname]
                if self.compress:
                    prop = ptype_grp.create_dataset(
                            propname, data=array)
                else:
                    prop = ptype_grp.create_dataset(
                            propname, data=array,
                            compression="gzip", compression_opts=9
                            )

                prop.attrs['stored'] = True
            else:
                prop = ptype_grp.create_dataset(propname, (0,))
                prop.attrs['stored'] = False

            for attname, value in attributes.items():
                if value is None:
                    value = 'None'
                prop.attrs[attname] = value

    def _set_solver_data(self, grp):
        for name, data in self.solver_data.items():
            grp.attrs[name] = data


[docs]def load(fname): """ Load the output data Parameters ---------- fname: str Name of the file or full path Examples -------- >>> data = load('elliptical_drop_100.npz') >>> data.keys() ['arrays', 'solver_data'] >>> arrays = data['arrays'] >>> arrays.keys() ['fluid'] >>> fluid = arrays['fluid'] >>> type(fluid) pysph.base.particle_array.ParticleArray >>> data['solver_data'] {'count': 100, 'dt': 4.6416394784204199e-05, 't': 0.0039955855395528766} """ if fname.endswith('npz'): output = NumpyOutput() elif fname.endswith('hdf5'): output = HDFOutput() if os.path.isfile(fname): return output.load(fname) else: msg = "File not present" raise RuntimeError(msg)
[docs]def dump(filename, particles, solver_data, detailed_output=False, only_real=True, mpi_comm=None, compress=False): """ Dump the given particles and solver data to the given filename. Parameters ---------- filename: str Filename to dump to. particles: sequence(ParticleArray) Sequence of particle arrays to dump. solver_data: dict Additional information to dump about solver state. detailed_output: bool Specifies if all arrays should be dumped. only_real: bool Only dump the real particles. mpi_comm: mpi4pi.MPI.Intracomm An MPI communicator to use for parallel commmunications. compress: bool Specify if the file is to be compressed or not. If `mpi_comm` is not passed or is set to None the local particles alone are dumped, otherwise only rank 0 dumps the output. """ if filename.endswith(output_formats): fname = os.path.splitext(filename)[0] else: fname = filename filename = fname + '.hdf5' if filename.endswith('hdf5') and has_h5py(): file_format = 'hdf5' output = HDFOutput(detailed_output, only_real, mpi_comm, compress) else: output = NumpyOutput(detailed_output, only_real, mpi_comm, compress) file_format = 'npz' filename = fname + '.' + file_format output.dump(filename, particles, solver_data)