# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2017 Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
logger = logging.getLogger(__name__)
from aquaduct.utils import clui
import numpy as np
from scipy.spatial.distance import cdist, pdist
from aquaduct.geom import traces
from aquaduct.traj.inlets import Inlet, InletTypeCodes
from aquaduct.utils.helpers import is_number, lind, list_blocks_to_slices, split_list
from aquaduct.utils.helpers import tupleify, sortify, listify, arrayify1
from aquaduct.utils.maths import make_default_array
########################################################################################################################
# paths/list manipulations
# following part of code comes directly from the very initial tcl/vmd implementation
# all following functions should return list
# functions union_smartr and xor_smartr were added later in python implementation to speed up calculations
[docs]def union_full(a, b):
return [aa for aa in a if aa in b]
[docs]def union_smartr(a, b):
b_ = SmartRange(b)
return [aa for aa in a if b_.isin(aa)]
[docs]def union(a, b, smartr=True):
if smartr:
return union_smartr(a, b)
return union_full(a, b)
[docs]def glue(a, b):
if a[-1] >= b[0]:
g = list(set(a + b))
g.sort()
return g
return []
[docs]@listify
def xor_full(a, b):
ab = union(a, b)
for e in glue(a, b):
if e not in ab:
yield e
[docs]@listify
def xor_smartr(a, b):
ab = SmartRange(union_smartr(a, b))
for e in glue(a, b):
if not ab.isin(e):
yield e
[docs]def xor(a, b, smartr=True):
if smartr:
return xor_smartr(a, b)
return xor_full(a, b)
[docs]def left(a, b, smartr=True):
return union(a, xor(a, b, smartr=smartr), smartr=smartr)
[docs]def right(a, b, smartr=True):
return union(b, xor(a, b, smartr=smartr), smartr=smartr)
########################################################################################################################
[docs]class SmartRangeFunction(object):
[docs] def __init__(self, element, times):
self.element = element
self.times = times
[docs] def __repr__(self):
return "%s(%r,%d)" % (self.__class__.__name__, self.element, self.times)
[docs] def get(self):
raise NotImplementedError('This method should be implemented in a child class.')
[docs] def rev(self):
raise NotImplementedError('This method should be implemented in a child class.')
[docs] def isin(self, element):
raise NotImplementedError('This method should be implemented in a child class.')
[docs]class SmartRangeEqual(SmartRangeFunction):
[docs] def get(self):
return [self.element] * self.times
[docs] def rev(self):
return self
[docs] def isin(self, element):
return element == self.element
[docs]class SmartRangeIncrement(SmartRangeFunction):
[docs] def get(self):
return (self.element + i for i in xrange(self.times))
[docs] def rev(self):
return SmartRangeDecrement(self.element + self.times - 1, self.times)
[docs] def isin(self, element):
return (element >= self.element) and (element <= self.element + self.times - 1)
[docs]class SmartRangeDecrement(SmartRangeFunction):
[docs] def get(self):
return (self.element - i for i in xrange(self.times))
[docs] def rev(self):
return SmartRangeIncrement(self.element - self.times + 1, self.times)
[docs] def isin(self, element):
return (element <= self.element) and (element >= self.element - self.times + 1)
[docs]class SmartRange(object):
[docs] def __init__(self, iterable=None):
self.__elements = []
self.__len = 0
self.__min = None
self.__max = None
if iterable is not None:
map(self.append, iterable)
[docs] def last_element(self):
if len(self.__elements) == 0:
return None
element = self.__elements[-1]
if isinstance(element, SmartRangeFunction):
return element.element
return element
[docs] def last_times(self):
if len(self.__elements) == 0:
return 0
element = self.__elements[-1]
if isinstance(element, SmartRangeFunction):
return element.times
return 1
@property
@listify
def raw(self):
for element in self.__elements:
if not isinstance(element, SmartRangeFunction):
yield SmartRangeEqual(element, 1)
else:
yield element
[docs] def append(self, element):
assert not isinstance(element, SmartRangeFunction)
if len(self.__elements) == 0:
self.__elements.append(element)
self.__min = element
self.__max = element
else:
if element == self.last_element():
if isinstance(self.__elements[-1], SmartRangeEqual) or (
not isinstance(self.__elements[-1], SmartRangeFunction)):
self.__elements[-1] = SmartRangeEqual(element, self.last_times() + 1)
else:
self.__elements.append(element)
else:
if not is_number(element):
self.__elements.append(element)
else:
if element - self.last_times() == self.last_element():
if isinstance(self.__elements[-1], SmartRangeIncrement) or (
not isinstance(self.__elements[-1], SmartRangeFunction)):
self.__elements[-1] = SmartRangeIncrement(self.last_element(), self.last_times() + 1)
else:
self.__elements.append(element)
elif element + self.last_times() == self.last_element():
if isinstance(self.__elements[-1], SmartRangeDecrement) or (
not isinstance(self.__elements[-1], SmartRangeFunction)):
self.__elements[-1] = SmartRangeDecrement(self.last_element(), self.last_times() + 1)
else:
self.__elements.append(element)
else:
self.__elements.append(element)
if element > self.__max:
self.__max = element
if element < self.__min:
self.__min = element
self.__len += 1
[docs] def get(self):
for element in self.__elements:
if not isinstance(element, SmartRangeFunction):
yield element
else:
for e in element.get():
yield e
[docs] def rev(self):
elements = []
for e in self.__elements[::-1]:
if isinstance(e, SmartRangeFunction):
elements.append(e.rev())
else:
elements.append(e)
self.__elements = elements
[docs] def __len__(self):
return self.__len
[docs] def min(self):
return self.__min
[docs] def max(self):
return self.__max
[docs] def isin(self, element):
for block in self.raw:
if block.isin(element):
return True
return False
########################################################################################################################
[docs]class PathTypesCodes():
path_in_code = 'i'
path_object_code = 'c'
path_out_code = 'o'
[docs]class GenericPathTypeCodes():
object_name = 'c'
scope_name = 's'
out_name = 'n'
[docs]class GenericPaths(object, GenericPathTypeCodes):
# object to store paths... is it required?
[docs] def __init__(self, id_of_res, min_pf=None, max_pf=None):
# id is any type of object; it is used as identifier
self.id = id_of_res
self.__types = SmartRange()
self.__frames = SmartRange()
self.coords = []
# following is required to correct in and out paths that begin or end in scope and
# begin or end at the very begining of MD or at very end of MD
self.max_possible_frame = max_pf
self.min_possible_frame = min_pf
# info methods
@property
def types(self):
return list(self.__types.get())
@property
def frames(self):
return list(self.__frames.get())
@property
def max_frame(self):
return self.__frames.max()
@property
def min_frame(self):
return self.__frames.min()
# add methods
[docs] def add_coord(self, coord):
# store coord as numpy float 32
self.coords.append(make_default_array(coord))
[docs] def add_object(self, frame):
self.add_type(frame, self.object_name)
[docs] def add_scope(self, frame):
self.add_type(frame, self.scope_name)
[docs] def add_type(self, frame, ftype):
self.__types.append(ftype)
self.__frames.append(frame)
[docs] def _gpo(self):
n = len(self.__frames)
types = self.types
begin = 0
for block in self.__frames.raw:
end = begin + block.times
# get types of this block
block_frames = list(block.get())
block_types = types[begin:end]
begin = end
# now iterate over block_types in a search of out_name
if self.out_name in block_types:
while self.out_name in block_types:
to_yield = block_frames[:block_types.index(self.out_name)]
to_yield_types = block_types[:block_types.index(self.out_name)]
if len(to_yield) > 0:
while to_yield_types[0] != self.object_name:
to_yield.pop(0)
to_yield_types.pop(0)
if len(to_yield) == 0:
break
if len(to_yield) > 0:
yield to_yield
block_types = block_types[block_types.index(self.out_name):]
block_frames = block_frames[block_types.index(self.out_name):]
if len(block_frames) > 0:
while block_types[0] != self.object_name:
block_frames.pop(0)
block_types.pop(0)
if len(block_frames) == 0:
break
if len(block_frames) > 0:
yield block_frames
[docs] def _gpi(self):
n = len(self.__frames)
types = self.types
begin = 0
for block in self.__frames.raw:
end = begin + block.times
# get types of this block
block_frames = list(block.get())
block_types = types[begin:end]
begin = end
# now iterate over block_types in a search of out_name
if self.out_name in block_types:
while self.out_name in block_types:
to_yield = block_frames[:block_types.index(self.out_name)]
to_yield_types = block_types[:block_types.index(self.out_name)]
if len(to_yield) > 0:
while to_yield_types[-1] != self.object_name:
to_yield.pop(-1)
to_yield_types.pop(-1)
if len(to_yield) == 0:
break
if len(to_yield) > 0:
yield to_yield
block_types = block_types[block_types.index(self.out_name):]
block_frames = block_frames[block_types.index(self.out_name):]
if len(block_frames) > 0:
while block_types[-1] != self.object_name:
block_frames.pop(-1)
block_types.pop(-1)
if len(block_frames) == 0:
break
if len(block_frames) > 0:
yield block_frames
[docs] def get_paths_in(self):
return self._gpi()
[docs] def get_paths_out(self):
return self._gpo()
[docs] def find_paths(self, fullonly=False, smartr=True):
paths_out = list(self.get_paths_out())
paths_in = list(self.get_paths_in())
for path_in in paths_in:
path_out = paths_out[0]
path_glue = glue(path_in, path_out)
if len(path_glue) > 0:
path_out = paths_out.pop(0)
path_core = union(path_in, path_out, smartr=smartr)
path_in = left(path_in, path_out, smartr=smartr)
path_out = right(path_core, path_out, smartr=smartr)
else:
path_core = []
path_out = []
if len(path_in) == 0 or len(path_core) == 0 or len(path_out) == 0:
if fullonly:
continue
# TODO: make separate method/function for that
# correct for max/min possible frame
if self.max_possible_frame is not None:
if len(path_out) > 0:
if path_out[-1] >= self.max_possible_frame:
path_core += path_out
path_out = []
if self.min_possible_frame is not None:
if len(path_in) > 0:
if path_in[0] <= self.min_possible_frame:
path_core = path_in + path_core
path_in = []
yield path_in, path_core, path_out
if not fullonly:
path_in = []
path_core = []
for path_out in paths_out:
# correct for max/min possible frame
if self.max_possible_frame is not None:
if len(path_out) > 0:
if path_out[-1] >= self.max_possible_frame:
path_core += path_out
path_out = []
yield path_in, path_core, path_out
[docs] def find_paths_coords_types(self, fullonly=False):
for path in self.find_paths(fullonly=fullonly):
# yield path, self.get_single_path_coords(path), self.get_single_path_types(path)
coords, types = self.get_single_path_coords_types(path)
yield path, coords, types
[docs] def get_single_path_coords_types(self, spath):
# returns typess for single path
# single path comprises of in,scope,out parts
# TODO: join it with get_single_path_coords
p_in, p_object, p_out = spath
frames = self.frames
types = self.types
def get_be(p_):
# if len(p_) == 1:
# quit return 0,1
return frames.index(p_[0]), frames.index(p_[-1]) + 1
in_t = []
object_t = []
out_t = []
in_c = []
object_c = []
out_c = []
if len(frames) == len(types):
# not full trajectory
# p_in
if len(p_in) > 0:
b, e = get_be(p_in)
in_t = types[b:e]
in_c = self.coords[b:e]
# p_object
if len(p_object) > 0:
b, e = get_be(p_object)
object_t = types[b:e]
object_c = self.coords[b:e]
# p_out
if len(p_out) > 0:
b, e = get_be(p_out)
out_t = types[b:e]
out_c = self.coords[b:e]
else:
# full trajectory
# p_in
if len(p_in) > 0:
b, e = p_in[0], p_in[-1] + 1
in_t = types[b:e]
in_c = self.coords[b:e]
# p_object
if len(p_object) > 0:
b, e = p_object[0], p_object[-1] + 1
object_t = types[b:e]
object_c = self.coords[b:e]
# p_out
if len(p_out) > 0:
b, e = p_out[0], p_out[-1] + 1
out_t = types[b:e]
out_c = self.coords[b:e]
return (in_c, object_c, out_c), (in_t, object_t, out_t)
[docs] def barber_with_spheres(self, spheres):
# calculate big distance matrix
# distances = cdist(self.coords, [s.center for s in spheres],metric='euclidean')
# compare with radii
# tokeep = distances > np.matrix([[s.radius for s in spheres]])
if len(spheres):
tokeep = np.argwhere((cdist(self.coords, [s.center for s in spheres], metric='euclidean') > np.matrix(
[[s.radius for s in spheres]])).all(1).A1).flatten().tolist()
#tokeep = np.argwhere(tokeep).flatten().tolist()
self.coords = lind(self.coords, tokeep)
self.__types = SmartRange(lind(self.types, tokeep))
self.__frames = SmartRange(lind(self.frames, tokeep))
# SinglePathID = namedtuple('SinglePathID', 'id nr')
[docs]class SinglePathID(object):
[docs] def __init__(self, path_id=None, nr=None):
self.id = path_id
self.nr = nr
[docs] def __str__(self):
return '%d:%d' % (self.id, self.nr)
[docs] def __eq__(self, other):
return str(self) == str(other)
[docs] def __ne__(self, other):
return str(self) != str(other)
[docs]def yield_single_paths(gps, fullonly=False, progress=False):
# iterates over gps - list of GenericPaths objects and transforms them in to SinglePath objects
nr_dict = {}
for nr, gp in enumerate(gps):
path_id = gp.id
with clui.tictoc('Processing path %d' % path_id):
for paths, coords, types in gp.find_paths_coords_types(fullonly=fullonly):
if path_id in nr_dict:
nr_dict.update({path_id: (nr_dict[path_id] + 1)})
else:
nr_dict.update({path_id: 0})
if progress:
yield SinglePath(SinglePathID(path_id=path_id, nr=nr_dict[path_id]), paths, coords, types), nr
else:
yield SinglePath(SinglePathID(path_id=path_id, nr=nr_dict[path_id]), paths, coords, types)
[docs]class SinglePath(object, PathTypesCodes, InletTypeCodes):
# special class
# represents one path
empty_coords = make_default_array(np.zeros((0, 3)))
[docs] def __init__(self, path_id, paths, coords, types):
self.id = path_id
# for paths use SmartRanges and then provide methods to read them
self.__path_in, self.__path_object, self.__path_out = map(SmartRange, paths)
# similarly, do it with types
# self.path_in, self.path_object, self.path_out = paths
self.__types_in, self.__types_object, self.__types_out = map(SmartRange, types)
# self.types_in, self.types_object, self.types_out = types
self.coords_in, self.coords_object, self.coords_out = map(make_default_array, coords)
self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out = None, None, None
self.smooth_method = None
# return np.vstack([c for c in self._coords if len(c) > 0])
@property
def path_in(self):
return list(self.__path_in.get())
@property
def path_object(self):
return list(self.__path_object.get())
@property
def path_out(self):
return list(self.__path_out.get())
@property
def types_in(self):
return list(self.__types_in.get())
@property
def types_object(self):
return list(self.__types_object.get())
@property
def types_out(self):
return list(self.__types_out.get())
# ---------------------------------------------------------------------------------------------------------------- #
# DEPRECATED
@property
def coords_first_in(self):
if len(self.__path_in) > 0:
return self.coords_in[0]
@property
def coords_last_out(self):
if len(self.__path_out) > 0:
return self.coords_out[-1]
@property
def coords_filo(self):
# first in and last out plus type!
return [(inlet, {0: self.incoming, 1: self.outgoing}[nr]) for nr, inlet in
enumerate((self.coords_first_in, self.coords_last_out)) if inlet is not None]
# ---------------------------------------------------------------------------------------------------------------- #
[docs] def get_inlets(self):
if self.has_in:
yield Inlet(coords=self.coords_in[0],
type=(InletTypeCodes.surface, InletTypeCodes.incoming),
reference=self.id)
yield Inlet(coords=self.coords_in[-1],
type=(InletTypeCodes.internal, InletTypeCodes.incoming),
reference=self.id)
if self.has_out:
yield Inlet(coords=self.coords_out[0],
type=(InletTypeCodes.internal, InletTypeCodes.outgoing),
reference=self.id)
yield Inlet(coords=self.coords_out[-1],
type=(InletTypeCodes.surface, InletTypeCodes.outgoing),
reference=self.id)
####################################################################################################################
# coords
@property
def coords(self):
return self.coords_in, self.coords_object, self.coords_out
@property
def coords_cont(self):
# returns coords as one array
return make_default_array(np.vstack([c for c in self.coords if len(c) > 0]))
####################################################################################################################
# paths
@property
def __paths(self):
return self.__path_in, self.__path_object, self.__path_out
@property
def paths(self):
return self.path_in, self.path_object, self.path_out
@property
def paths_cont(self):
return self.path_in + self.path_object + self.path_out
####################################################################################################################
# types
@property
def types(self):
# spath types
return ([self.path_in_code] * len(self.__path_in),
[self.path_object_code] * len(self.__path_object),
[self.path_out_code] * len(self.__path_out))
@property
def types_cont(self):
return ([self.path_in_code] * len(self.__path_in)) + ([self.path_object_code] * len(self.__path_object)) + (
[self.path_out_code] * len(self.__path_out))
@property
def gtypes(self):
# generic types
return self.types_in, self.types_object, self.types_out
@property
def gtypes_cont(self):
return self.types_in + self.types_object + self.types_out
@property
@tupleify
def etypes(self):
# extended types
for t, g in zip(self.types, self.gtypes):
yield [''.join(t) for t in zip(t, g)]
@property
def etypes_cont(self):
return [''.join(t) for t in zip(self.types_cont, self.gtypes_cont)]
####################################################################################################################
@property
def size(self):
return sum(map(len, self.__paths))
@property
def begins(self):
return self.paths_cont[0]
@property
def ends(self):
return self.paths_cont[-1]
@property
def has_in(self):
return len(self.__path_in) > 0
@property
def has_object(self):
return len(self.__path_object) > 0
@property
def has_out(self):
return len(self.__path_out) > 0
####################################################################################################################
[docs] @tupleify
def get_coords(self, smooth=None):
# TODO: it is not used to get smooth coords but to get coords in general, conditionally smoothed
# if smooth is not none applies smoothing
if smooth is not None:
if smooth != self.smooth_method:
self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out = self._make_smooth_coords(
self.coords_cont, smooth)
self.smooth_method = smooth
for nr, coords in enumerate((self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out)):
if coords is None:
yield self.coords[nr]
else:
yield coords
else:
for coords in self.coords:
yield coords
[docs] def get_coords_cont(self, smooth=None):
# returns coords as one array
return make_default_array(np.vstack([c for c in self.get_coords(smooth) if len(c) > 0]))
[docs] @tupleify
def _make_smooth_coords(self, coords, smooth=None):
# if smooth is not none applies smoothing
# smooth should be callable and should return an object of length equal to submitted one
# get continuous coords
if smooth:
coords_smooth = smooth(coords)
else:
coords_smooth = self.coords
# now lets return tupple of coords
nr = 0
for path in self.__paths:
if len(path) > 0:
yield make_default_array(coords_smooth[nr:nr + len(path)])
nr += len(path)
else:
yield self.empty_coords
[docs] def apply_smoothing(self, smooth):
# permament change!
self.coords_in, self.coords_object, self.coords_out = self._make_smooth_coords(self.coords_cont, smooth)
####################################################################################################################
[docs] def get_distance_cont(self, smooth=None, normalize=False):
length = make_default_array(
np.hstack((np.array([0]), np.cumsum(traces.diff(self.get_coords_cont(smooth=smooth))))))
if normalize:
if is_number(normalize):
norm_factor = float(normalize)
else:
norm_factor = max(length)
length = length / norm_factor
return length
[docs] def get_distance_rev_cont(self, *args, **kwargs):
length = self.get_distance_cont(*args, **kwargs)
return length[-1] - length
[docs] @arrayify1
def get_distance_both_cont(self, *args, **kwargs):
length = self.get_distance_cont(*args, **kwargs)
return (min(n, r) for n, r in zip(length, length[-1] - length))
[docs] @arrayify1
def get_velocity_cont(self, smooth=None, normalize=False):
distance = self.get_distance_cont(smooth=smooth, normalize=normalize)
return traces.derrivative(distance)
[docs] @arrayify1
def get_acceleration_cont(self, smooth=None, normalize=False):
velocity = self.get_velocity_cont(smooth=smooth, normalize=normalize)
return traces.derrivative(velocity)
####################################################################################################################
[docs]class MasterPath(SinglePath):
[docs] def __init__(self, sp):
SinglePath.__init__(self, sp.id, sp.paths, sp.coords, sp.gtypes)
self.width_cont = None
[docs] def add_width(self, width):
assert len(width) == self.size
self.width_cont = width