Source code for aquaduct.traj.barber

# -*- coding: utf-8 -*-

# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2017  Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
#

import logging

logger = logging.getLogger(__name__)

from collections import namedtuple

from scipy.spatial.distance import cdist
import numpy as np

from aquaduct.utils import clui
from aquaduct.traj.reader import atom2vdw_radius
from aquaduct.utils.helpers import lind

[docs]class Sphere(namedtuple('ABSphere', 'center radius')): pass
[docs]class WhereToCut(object): # creates collection of Spheres
[docs] def __init__(self,spaths,traj_reader, selection=None, mincut=None, mincut_level=False, maxcut=None, maxcut_level=False, tovdw=False, forceempty=False): # force empty means that empty spheres are also returned with radius 0 self.forceempty = forceempty self.selection = selection self.mincut = mincut self.maxcut = maxcut self.mincut_level = mincut_level self.maxcut_level = maxcut_level self.tovdw = tovdw self.spheres = [] self.add_spheres(spaths, traj_reader)
[docs] def check_minmaxcuts(self): mincut = False mincut_val = None if self.mincut is not None: mincut = True mincut_val = float(self.mincut) logger.debug('mincut set to %0.2f', mincut_val) maxcut = False maxcut_val = None if self.maxcut is not None: maxcut = True maxcut_val = float(self.maxcut) logger.debug('maxcut set to %0.2f', maxcut_val) if mincut and maxcut: if maxcut_val < mincut_val: logger.warning('Values of mincut %0.2f and maxcut %0.2f are mutually exclusive. No spheres will be used in Auto Barber.',mincut_val,maxcut_val) if self.maxcut_level or self.mincut_level: logger.warning('Options mincut_level and maxcut_level are set to %s and %s accordingly, some spheres might be retained.') return mincut,mincut_val,maxcut,maxcut_val
[docs] def add_spheres(self, spaths, traj_reader): clui.message("Auto Barber is looking where to cut:") mincut, mincut_val, maxcut, maxcut_val = self.check_minmaxcuts() pbar = clui.pbar(len(spaths)) barber = traj_reader.parse_selection(self.selection) vdwradius = 0 for sp in spaths: centers = [] frames = [] if sp.has_in: centers.append(sp.coords_in[0]) frames.append(sp.path_in[0]) if sp.has_out: centers.append(sp.coords_out[-1]) frames.append(sp.path_out[-1]) for center, frame in zip(centers, frames): traj_reader.set_current_frame(frame) distances = cdist(np.matrix(center), barber.atom_positions(), metric='euclidean').flatten() if self.tovdw: vdwradius = atom2vdw_radius(barber.atoms[np.argmin(distances)]) radius = min(distances) - vdwradius make_sphere = True if radius <= 0: logger.debug('VdW correction resulted in <= 0 radius.') make_sphere = False if mincut and radius < mincut_val: if not self.mincut_level: logger.debug('Sphere radius %0.2f is less then mincut %0.2f', radius, mincut_val) make_sphere = False else: logger.debug('Sphere radius %0.2f leveled to mincut %0.2f', radius, mincut_val) radius = mincut_val if maxcut and radius > maxcut_val: if not self.maxcut_level: logger.debug('Sphere radius %0.2f is greater then maxcut %0.2f', radius, maxcut_val) make_sphere = False else: logger.debug('Sphere radius %0.2f leveled to maxcut %0.2f', radius, maxcut_val) radius = maxcut_val if make_sphere: logger.debug('Added sphere of radius %0.2f' % radius) self.spheres.append(Sphere(center, radius)) elif self.forceempty: logger.debug('Added sphere of radius 0') self.spheres.append(Sphere(center, 0)) pbar.update(1) pbar.finish()
[docs] def cut_thyself(self): clui.message("Barber, cut thyself:") pbar = clui.pbar(len(self.spheres)) noredundat_spheres_count = 0 while True: self.spheres.sort(key=lambda s: s.radius, reverse=True) # create matrix of coords and vector of radii spheres_coords = np.array([sphe.center for sphe in self.spheres]) spheres_radii = np.array([sphe.radius for sphe in self.spheres]) noredundat_spheres = [] while self.spheres: # topmost is the biggest one big = self.spheres.pop(0) center, radius = big # calculate distances of all to big but big distances = cdist(np.matrix(center), spheres_coords[1:], metric='euclidean').flatten() # add radii distances += spheres_radii[1:] # remove if distance <= radius to_keep = distances > radius # do keep spheres_coords = spheres_coords[1:][to_keep] spheres_radii = spheres_radii[1:][to_keep] # do keep spheres np.argwhere(to_keep).flatten() to_keep_ids = np.argwhere(to_keep).flatten().tolist() self.spheres = lind(self.spheres, to_keep_ids) # add big to non redundant noredundat_spheres.append(big) pbar.update(sum(to_keep == False)) logger.debug("Removal of redundant cutting places: done %d, to analyze %d" % ( len(noredundat_spheres), len(self.spheres))) if len(noredundat_spheres) == noredundat_spheres_count: logger.debug("Removal of redundant cutting places done. %d non redundant spheres found." % len( noredundat_spheres)) break else: noredundat_spheres_count = len(noredundat_spheres) self.spheres = noredundat_spheres self.spheres = noredundat_spheres pbar.finish()
if False: clui.message("Auto Barber in action:") pbar = clui.pbar(len(paths)) for p in paths.values(): p.barber_with_spheres(spheres) pbar.next() pbar.finish()