Source code for pychemia.population.relaxstructures

import random
import uuid
from fractions import gcd
import numpy as np
from ._population import Population
from pychemia import Composition, Structure, pcm_log
from pychemia.analysis import StructureAnalysis, StructureChanger, StructureMatch
from pychemia.analysis.splitting import SplitMatch
from pychemia.utils.mathematics import unit_vector
from pychemia.utils.periodic import atomic_number, covalent_radius
from pymongo import ASCENDING
from pychemia.db import get_database
from pychemia.crystal import CrystalSymmetry


[docs]class RelaxStructures(Population):
[docs] def evaluate_entry(self, entry_id): pass
def __init__(self, name, composition=None, tag='global', target_forces=1E-3, value_tol=1E-2, distance_tol=0.3, min_comp_mult=2, max_comp_mult=8, pcdb_source=None, pressure=0.0, target_stress=None, target_diag_stress=None, target_nondiag_stress=None): """ Defines a population of PyChemia Structures, The 'name' of the database is used to create the MongoDB database and the structures are uniform in composition. A specific 'tag' could be attached to differentiate the other instances running concurrently. The 'delta' argument is the scaling factor for changers and mixers. In the case of populations supported on PyChemia databases the 'new' will erase the database :param name: The name of the population. ie the name of the database :param composition: The composition uniform for all the members :param tag: A tag to differentiate different instances running concurrently :return: A new StructurePopulation object """ if composition is not None: self.composition = Composition(composition) else: self.composition = None self.tag = tag self.target_forces = target_forces self.value_tol = value_tol self.distance_tol = distance_tol self.min_comp_mult = min_comp_mult self.max_comp_mult = max_comp_mult self.pcdb_source = pcdb_source self.pressure = pressure if target_stress is None: self.target_stress = target_forces else: self.target_stress = target_stress if target_diag_stress is None: self.target_diag_stress = self.target_stress else: self.target_diag_stress = target_diag_stress if target_diag_stress is None: self.target_nondiag_stress = self.target_stress else: self.target_nondiag_stress = target_nondiag_stress self.name = name Population.__init__(self, name, tag) if self.pcdb_source is not None: self.sources = {} for i in range(min_comp_mult, max_comp_mult+1): self.sources[i] = [] for entry in self.pcdb_source.entries.find({'structure.natom': i*self.composition.natom, 'structure.nspecies': self.composition.nspecies}, {'_id': 1}): self.sources[i].append(entry['_id'])
[docs] def recover(self): data = self.get_population_info() if data is not None: self.distance_tol = data['distance_tol'] self.value_tol = data['value_tol'] self.name = data['name'] self.target_forces = data['target_forces']
[docs] def get_structure(self, entry_id): entry = self.get_entry(entry_id) return Structure.from_dict(entry['structure'])
@staticmethod
[docs] def new_identifier(): return str(uuid.uuid4())[-12:]
[docs] def new_entry(self, structure, active=True): properties = {'forces': None, 'stress': None, 'energy': None} status = {self.tag: active} entry = {'structure': structure.to_dict, 'properties': properties, 'status': status} entry_id = self.insert_entry(entry) pcm_log.debug('Added new entry: %s with tag=%s: %s' % (str(entry_id), self.tag, str(active))) return entry_id
[docs] def get_max_force_stress(self, entry_id): entry = self.get_entry(entry_id, projection={'properties': 1}) max_force = None max_diag_stress = None max_nondiag_stress = None if entry is not None and entry['properties'] is not None: properties = entry['properties'] if 'forces' in properties and 'stress' in properties: if properties['forces'] is not None and properties['stress'] is not None: forces = np.array(entry['properties']['forces']) stress = np.array(entry['properties']['stress']) max_force = np.max(np.apply_along_axis(np.linalg.norm, 1, forces)) max_diag_stress = np.max(np.abs(stress[:3])) max_nondiag_stress = np.max(np.abs(stress[4:])) return max_force, max_diag_stress, max_nondiag_stress
[docs] def is_evaluated(self, entry_id): max_force, max_diag_stress, max_nondiag_stress = self.get_max_force_stress(entry_id) if max_force is None or max_diag_stress is None or max_nondiag_stress is None: return False elif max_force < self.target_forces and max_diag_stress < self.target_diag_stress + self.pressure: if max_nondiag_stress < self.target_nondiag_stress: return True else: return False else: return False
[docs] def add_random(self, random_probability=0.3): """ Add one random structure to the population """ entry_id = None structure = Structure() if self.composition is None: raise ValueError('No composition associated to this population') factor = np.random.randint(self.min_comp_mult, self.max_comp_mult + 1) comp = self.composition.composition.copy() print("Initail composition: %s" % comp) print(Composition(comp)) print(Composition(comp).symbols) for i in comp: comp[i] *= factor new_comp = Composition(comp) for i in range(len(new_comp.symbols)): if new_comp.symbols[i] == "Mg": new_comp.symbols[i] = "Ca" else: new_comp.symbols[i] = "Mg" print(new_comp.symbols) print("###############################################################") print("New comp symbols= ", new_comp.symbols) print("###############################################################") while True: rnd = random.random() condition = {'structure.nspecies': new_comp.nspecies, 'structure.natom': new_comp.natom} if self.pcdb_source is None: rnd = 0 if len(self.sources[factor]) == 0: rnd = 0 if self.pcdb_source is None or rnd < random_probability: pcm_log.debug('Random Structure') structure = Structure.random_cell(new_comp, method='stretching', stabilization_number=5, nparal=5, periodic=True) break else: pcm_log.debug('From source') entry_id = self.sources[factor][np.random.randint(0, len(self.sources[factor]))] structure = self.pcdb_source.get_structure(entry_id) print("chosen structure from database =", structure) sym = CrystalSymmetry(structure) scale_factor = float(np.max(covalent_radius(new_comp.species)) / np.max(covalent_radius(structure.species))) reduce_scale = scale_factor ** (1. / 3) # WIH msg = 'Mult: %d natom: %d From source: %s Spacegroup: %d Scaling: %7.3f' print(msg % (factor, structure.natom, structure.formula, sym.number(), scale_factor)) # structure.set_cell(np.dot(scale_factor * np.eye(3), structure.cell)) # WIH structure.set_cell(np.dot(reduce_scale * np.eye(3), structure.cell)) # WIH print("symbols before change = ", structure.symbols) structure.symbols = new_comp.symbols print("symbols after change = ", structure.symbols) self.sources[factor].remove(entry_id) break return self.new_entry(structure), entry_id
[docs] def check_duplicates(self, ids): """ Computes duplicate structures measuring its distance when their value is larger than value_tol. If the distance is lower than 'distance_tol' the structures will be cosidered as duplicates. :param ids: :return: (dict) Dictionary of duplicates, the keys are the ids of the duplicates and the value is the structure from which the structure is duplicated. In general the energy of the 'value' is lower than the 'key' """ ret = {} selection = self.ids_sorted(ids) values = np.array([self.value(i) for i in selection]) if len(values) == 0: return ret diffs = np.ediff1d(values) for i in range(len(diffs)): idiff = diffs[i] if idiff < self.value_tol: ident1 = selection[i] ident2 = selection[i + 1] pcm_log.debug('Testing distances between %s and %s' % (str(ident1), str(ident2))) distance = self.distance(ident1, ident2) # print 'Distance = ', distance if distance < self.distance_tol: pcm_log.debug('Distance %7.3f < %7.3f' % (distance, self.distance_tol)) ret[ident2] = ident1 if len(ret) > 0: pcm_log.debug('Number of duplicates %d' % len(ret)) return ret
[docs] def get_duplicates(self, ids, fast=False): dupes_dict = {} dupes_list = [] values = {} for i in ids: values[i] = self.value(i) selection = self.ids_sorted(ids) print('Searching duplicates in %d structures' % len(selection)) for i in range(len(selection) - 1): entry_id = selection[i] value_i = values[entry_id] for j in range(i + 1, len(selection)): entry_jd = selection[j] if fast and entry_jd in dupes_list: continue value_j = values[entry_jd] if abs(value_i - value_j) < self.value_tol: distance = self.distance(entry_id, entry_jd) if distance < self.distance_tol: if entry_id in dupes_dict: dupes_dict[entry_id].append(entry_jd) else: dupes_dict[entry_id] = [entry_jd] dupes_list.append(entry_jd) return dupes_dict, [x for x in selection if x in dupes_list]
[docs] def cleaned_from_duplicates(self, ids): selection = self.ids_sorted(ids) duplicates_dict = self.check_duplicates(selection) return [x for x in selection if x not in duplicates_dict.keys()]
[docs] def distance_matrix(self, ids): ret = np.zeros((len(ids), len(ids))) for i in range(len(ids) - 1): for j in range(i, len(ids)): ret[i, j] = self.distance(ids[i], ids[j]) ret[j, i] = ret[i, j] return ret
[docs] def diff_values_matrix(self): members = self.members ret = np.zeros((len(members), len(members))) for i in range(len(members)): for j in range(i, len(members)): if self.value(members[i]) is not None and self.value(members[j]) is not None: ret[i, j] = np.abs(self.value(members[i]) - self.value(members[j])) else: ret[i, j] = float('nan') ret[j, i] = ret[i, j] return ret
[docs] def distance(self, entry_id, entry_jd, rcut=50): ids_pair = [entry_id, entry_jd] ids_pair.sort() distance_entry = self.pcdb.db.distances.find_one({'pair': ids_pair}, {'distance': 1}) self.pcdb.db.distances.create_index([("pair", ASCENDING)]) if distance_entry is None: print('Distance not in DB') fingerprints = {} for entry_ijd in [entry_id, entry_jd]: if self.pcdb.db.fingerprints.find_one({'_id': entry_ijd}) is None: structure = self.get_structure(entry_ijd) analysis = StructureAnalysis(structure, radius=rcut) x, ys = analysis.fp_oganov() fingerprint = {'_id': entry_ijd} for k in ys: atomic_number1 = atomic_number(structure.species[k[0]]) atomic_number2 = atomic_number(structure.species[k[1]]) pair = '%06d' % min(atomic_number1 * 1000 + atomic_number2, atomic_number2 * 1000 + atomic_number1) fingerprint[pair] = list(ys[k]) if self.pcdb.db.fingerprints.find_one({'_id': entry_ijd}) is None: self.pcdb.db.fingerprints.insert(fingerprint) else: self.pcdb.db.fingerprints.update({'_id': entry_ijd}, fingerprint) fingerprints[entry_ijd] = fingerprint else: fingerprints[entry_ijd] = self.pcdb.db.fingerprints.find_one({'_id': entry_ijd}) dij = [] for pair in fingerprints[entry_id]: if pair in fingerprints[entry_jd] and pair != '_id': uvect1 = unit_vector(fingerprints[entry_id][pair]) uvect2 = unit_vector(fingerprints[entry_jd][pair]) dij.append(0.5 * (1.0 - np.dot(uvect1, uvect2))) distance = float(np.mean(dij)) self.pcdb.db.distances.insert({'pair': ids_pair, 'distance': distance}) else: distance = distance_entry['distance'] return distance
[docs] def add_from_db(self, db_settings, sizemax=1): if self.composition is None: raise ValueError('No composition associated to this population') comp = Composition(self.composition) readdb = get_database(db_settings) index = 0 for entry in readdb.entries.find({'structure.formula': comp.formula, 'structure.natom': {'$lte': self.min_comp_mult * comp.natom, '$gte': self.max_comp_mult * comp.natom}}): if index < sizemax: print('Adding entry ' + str(entry['_id']) + ' from ' + readdb.name) self.new_entry(readdb.get_structure(entry['_id'])) index += 1
[docs] def move_random(self, entry_id, factor=0.2, in_place=False, kind='move'): structure = self.get_structure(entry_id) changer = StructureChanger(structure=structure) if kind == 'move': changer.random_move_many_atoms(epsilon=factor) else: # change changer.random_change(factor) if in_place: return self.set_structure(entry_id, changer.new_structure) else: return self.new_entry(changer.new_structure, active=False)
[docs] def move(self, entry_id, entry_jd, factor=0.2, in_place=False): """ Moves entry_id in the direction of entry_jd If in_place is True the movement occurs on the same address as entry_id :param factor: :param entry_id: :param entry_jd: :param in_place: :return: """ structure_mobile = self.get_structure(entry_id) structure_target = self.get_structure(entry_jd) if structure_mobile.natom != structure_target.natom: # Moving structures with different number of atoms is only implemented for smaller structures moving # towards bigger ones by making a super-cell and only if their size is smaller that 'max_comp_mult' mult1 = structure_mobile.get_composition().gcd mult2 = structure_target.get_composition().gcd lcd = mult1 * mult2 / gcd(mult1, mult2) if lcd > self.max_comp_mult: # The resulting structure is bigger than the limit # cannot move if not in_place: return self.new_entry(structure_mobile) else: return entry_id # We will move structure1 in the direction of structure2 match = StructureMatch(structure_target, structure_mobile) match.match_size() match.match_shape() match.match_atoms() displacements = match.reduced_displacement() new_reduced = match.structure2.reduced + factor * displacements new_cell = match.structure2.cell new_symbols = match.structure2.symbols new_structure = Structure(reduced=new_reduced, symbols=new_symbols, cell=new_cell) if in_place: return self.set_structure(entry_id, new_structure) else: return self.new_entry(new_structure, active=False)
def __str__(self): ret = ' Structure Population\n\n' ret += ' Name: %s\n' % self.name ret += ' Tag: %s\n' % self.tag ret += ' Target-Forces: %7.2E\n' % self.target_forces ret += ' Value tolerance: %7.2E\n' % self.value_tol ret += ' Distance tolerance: %7.2E\n\n' % self.distance_tol if self.composition is not None: ret += ' Composition: %s\n' % self.composition.formula ret += ' Minimal composition multiplier: %d\n' % self.min_comp_mult ret += ' Maximal composition multiplier: %d\n\n' % self.max_comp_mult else: ret += '\n' ret += ' Members: %d\n' % len(self.members) ret += ' Actives: %d\n' % len(self.actives) ret += ' Evaluated: %d\n' % len(self.evaluated) return ret
[docs] def value(self, entry_id): entry = self.get_entry(entry_id) structure = self.get_structure(entry_id) if 'properties' not in entry: pcm_log.debug('This entry has no properties %s' % str(entry['_id'])) return None elif entry['properties'] is None: return None elif 'energy' not in entry['properties']: pcm_log.debug('This entry has no energy in properties %s' % str(entry['_id'])) return None else: return entry['properties']['energy'] / structure.get_composition().gcd
@property def to_dict(self): return {'name': self.name, 'tag': self.tag, 'target_forces': self.target_forces, 'value_tol': self.value_tol, 'distance_tol': self.distance_tol}
[docs] def from_dict(self, population_dict): return RelaxStructures(name=population_dict['name'], tag=population_dict['tag'], target_forces=population_dict['target_forces'], value_tol=population_dict['value_tol'], distance_tol=population_dict['distance_tol'])
[docs] def cross(self, ids): assert len(ids) == 2 structure1 = self.get_structure(ids[0]) structure2 = self.get_structure(ids[1]) split_match = SplitMatch(structure1, structure2) st1, st2 = split_match.get_simple_match() entry_id = self.new_entry(st1, active=True) entry_jd = self.new_entry(st2, active=True) return entry_id, entry_jd
[docs] def str_entry(self, entry_id): struct = self.get_structure(entry_id) return str(struct)