"""
There are two kinds of Repositories in PyChemia
Structure Repositories where many structures are stored
Execution Repositories where the out of every calculation
is stored
Each structure contains some metadata that is accessible with
the StructureEntry object
Also each calculation has it own metadata accessible by ExecutionEntry
object
"""
import hashlib
import json as _json
import os
import uuid as _uuid
import shutil as _shutil
import math
from pychemia.core.structure import load_structure_json
from pychemia.utils.computing import deep_unicode
[docs]class StructureEntry:
"""
Defines one entry in the repository of Structures
"""
def __init__(self, structure=None, repository=None, identifier=None, original_file=None, tags=None):
"""
Creates a new Entry for Structures
If identifier is provided the corresponding Structure is load in the Entry
Otherwise a new entry is created with a UUID random identifier
Args:
identifier: (string) UUID identifier for a structure
repository: (object) The StructureRepository that will be associated
original_file: (string) Path to the original file (CIF, POSCAR, etc)
tags: (string or list) Tags that will be associated to that structure
"""
self.properties = None
if identifier is None:
self.structure = structure
self.identifier = str(_uuid.uuid4())
self.path = None
if original_file is not None:
assert (os.path.isfile(original_file))
self.original_file = original_file
self.parents = []
self.children = []
if isinstance(tags, str):
self.tags = [tags]
elif isinstance(tags, list):
self.tags = tags
elif tags is None:
self.tags = []
else:
raise ValueError('The variable tags must be a string or list of strings')
if len(self.structure.composition) == 1:
self.add_tags('pure')
elif len(self.structure.composition) == 2:
self.add_tags('binary')
elif len(self.structure.composition) == 3:
self.add_tags('ternary')
elif len(self.structure.composition) == 4:
self.add_tags('quaternary')
else:
assert (original_file is None)
assert (structure is None)
assert (tags is None)
assert (repository is not None)
self.identifier = identifier
self.repository = repository
self.path = self.repository.path + '/' + self.identifier
if not os.path.isdir(self.path):
raise ValueError("Directory not found: " + self.path)
if not os.path.isfile(self.path + '/metadata.json'):
raise ValueError("No metadata found in " + self.path)
if not os.path.isfile(self.path + '/structure.json'):
raise ValueError("No structure found in " + self.path)
self.load()
[docs] def metadatatodict(self):
ret = {'tags': self.tags,
'parents': self.parents,
'children': self.children}
return ret
[docs] def load(self):
assert isinstance(self.identifier, str)
rf = open(self.path + '/metadata.json', 'r')
self.metadatafromdict(deep_unicode(_json.load(rf)))
rf.close()
if self.tags is None:
self.tags = []
if self.children is None:
self.children = []
if self.parents is None:
self.parents = []
self.structure = load_structure_json(self.path + '/structure.json')
if os.path.isfile(self.path + '/properties.json'):
rf = open(self.path + '/properties.json', 'r')
try:
self.properties = deep_unicode(_json.load(rf))
except ValueError:
os.rename(self.path + '/properties.json', self.path + '/properties.json.FAILED')
self.properties = None
rf.close()
self.load_originals()
[docs] def load_originals(self):
orig_dir = self.path + '/original'
if os.path.isdir(orig_dir):
self.original_file = [os.path.abspath(orig_dir + '/' + x) for x in os.listdir(orig_dir)]
else:
self.original_file = []
[docs] def save(self):
if self.path is None:
self.path = self.repository.path + '/' + self.identifier
wf = open(self.path + '/metadata.json', 'w')
_json.dump(self.metadatatodict(), wf, sort_keys=True, indent=4, separators=(',', ': '))
wf.close()
self.structure.save_json(self.path + '/structure.json')
if self.properties is not None:
wf = open(self.path + '/properties.json', 'w')
_json.dump(self.properties, wf, sort_keys=True, indent=4, separators=(',', ': '))
wf.close()
if self.original_file is not None:
self.add_original_file(self.original_file)
[docs] def metadatafromdict(self, entrydict):
self.tags = entrydict['tags']
self.parents = entrydict['parents']
self.children = entrydict['children']
[docs] def add_tags(self, tags):
_add2list(tags, self.tags)
[docs] def add_parents(self, parents):
_add2list(parents, self.parents)
[docs] def add_children(self, children):
_add2list(children, self.children)
[docs] def add_original_file(self, filep):
orig_dir = self.path + '/original'
if isinstance(filep, str):
filep = [filep]
self.load_originals()
hashs = {}
for iorig in self.original_file:
rf = open(iorig, 'r')
hashs[iorig] = hashlib.sha224(rf.read()).hexdigest()
rf.close()
for ifile in filep:
assert (os.path.isfile(ifile))
rf = open(ifile, 'r')
hash_ifile = hashlib.sha224(rf.read()).hexdigest()
if hash_ifile in hashs.values():
continue
if ifile not in self.original_file:
if not os.path.isdir(orig_dir):
os.mkdir(orig_dir)
if not os.path.isfile(orig_dir + '/' + os.path.basename(ifile)):
_shutil.copy2(ifile, orig_dir)
else:
i = 0
while True:
newfile = ifile + '_' + str(i)
if not os.path.isfile(orig_dir + '/' + os.path.basename(newfile)):
_shutil.copy(ifile, orig_dir + '/' + os.path.basename(newfile))
break
else:
i += 1
self.load_originals()
def __str__(self):
ret = 'Structure: \n' + str(self.structure)
ret += '\nTags: ' + str(self.tags)
ret += '\nParents: ' + str(self.parents)
ret += '\nChildren: ' + str(self.children)
ret += '\nIdentifier: ' + str(self.identifier)
ret += '\nOriginal Files:' + str(self.original_file)
ret += '\n'
return ret
def __eq__(self, other):
ret = True
if self.structure != other.structure:
print('Not equal structure')
ret = False
elif self.children is None and other.children is not None:
ret = False
elif self.children is not None and other.children is None:
ret = False
elif self.children is not None and set(self.children) != set(other.children):
print('Not equal children')
ret = False
elif self.parents is None and other.parents is not None:
ret = False
elif self.parents is not None and other.parents is None:
ret = False
elif self.parents is not None and set(self.parents) != set(other.parents):
print('Not equal parents')
ret = False
elif self.tags is None and other.tags is not None:
ret = False
elif self.tags is not None and other.tags is None:
ret = False
elif self.tags is not None and set(self.tags) != set(other.tags):
print('Not equal tags')
ret = False
return ret
def __ne__(self, other):
return not self.__eq__(other)
[docs]class PropertiesEntry:
"""
Defines one calc in the Execution Repository
"""
def __init__(self, structure_entry):
"""
Creates a new calc repository
"""
self.entry = structure_entry
self.properties = {}
[docs] def add_property(self, name, values):
self.properties[name] = values
[docs] def save(self):
"""
Save an existing repository information
"""
wf = open(self.entry.path + '/properties.json', 'w')
_json.dump(self.properties, wf, sort_keys=True, indent=4, separators=(',', ': '))
wf.close()
[docs] def load(self):
"""
Loads an existing db from its configuration file
"""
rf = open(self.path + '/properties.json', 'r')
self.properties = deep_unicode(_json.load(rf))
rf.close()
[docs]class StructureRepository:
"""
Defines the location of the executions repository
and structure repository and methods to add, remove
and check those db
"""
def __init__(self, path):
"""
Creates new db for calculations and structures
Args:
path: (string) Directory path for the structure repository
"""
self.path = os.path.abspath(path)
if os.path.isfile(self.path + '/db.json'):
self.load()
else:
self.tags = {}
if os.path.lexists(self.path):
if not os.path.isdir(self.path):
raise ValueError('Path exists already and it is not a directory')
else:
os.mkdir(self.path)
self.save()
[docs] def todict(self):
"""
Serialize the values of the db into a dictionary
"""
repos_dict = {'tags': self.tags}
return repos_dict
[docs] def fromdict(self, repos_dict):
self.tags = repos_dict['tags']
[docs] def save(self):
"""
Save an existing repository information
"""
wf = open(self.path + '/db.json', 'w')
_json.dump(self.todict(), wf, sort_keys=True, indent=4, separators=(',', ': '))
wf.close()
[docs] def load(self):
"""
Loads an existing db from its configuration file
"""
rf = open(self.path + '/db.json', 'r')
try:
jsonload = deep_unicode(_json.load(rf))
except ValueError:
print("Error deserializing the object")
jsonload = {'tags': {}}
self.fromdict(jsonload)
rf.close()
[docs] def rebuild(self):
ids = self.get_all_entries
self.tags = {}
for ident in ids:
struct_entry = StructureEntry(identifier=ident, repository=self)
for i in struct_entry.tags:
if i in self.tags:
self.tags[i].append(ident)
else:
self.tags[i] = [ident]
self.save()
@property
def get_all_entries(self):
return [x for x in os.listdir(self.path) if os.path.isfile(self.path + '/' + x + '/metadata.json')]
def __len__(self):
return len(self.get_all_entries)
[docs] def merge2entries(self, orig, dest):
assert (orig.structure == dest.structure)
dest.add_parents(orig.parents)
dest.add_children(orig.children)
dest.add_tags(orig.tags)
if orig.original_file is not None and len(orig.original_file) > 0:
dest.add_original_file(orig.original_file)
dest.save()
self.del_entry(orig)
[docs] def clean(self):
for i in self.tags:
for j in self.tags[i]:
if not os.path.isdir(self.path + '/' + j) or not os.path.isfile(self.path + '/' + j + '/metadata.json'):
print('Removing', j)
self.tags[i].remove(j)
self.save()
[docs] def refine(self):
formulas = self.get_formulas()
for j in formulas:
print(j)
if len(formulas[j]) > 1:
for i in range(len(formulas[j]) - 1):
stru1 = StructureEntry(repository=self, identifier=formulas[j][i])
stru2 = StructureEntry(repository=self, identifier=formulas[j][i + 1])
if stru1 == stru2:
self.merge2entries(stru1, stru2)
self.save()
[docs] def merge(self, other):
"""
Add all the contents from other db into the
calling object
:param other: StructureRepository
"""
conflict_entries = []
for i in other.get_all_enties:
if i in self.get_all_entries:
other_structure = StructureEntry(repository=other, identifier=i)
this_structure = StructureEntry(repository=self, identifier=i)
if this_structure != other_structure:
conflict_entries.append(i)
if len(conflict_entries) == 0:
for i in other.get_all_enties:
if i not in self.get_all_entries:
_shutil.copytree(other.path + '/' + i, self.path + '/' + i)
else:
print('Conflict entries found, No merge done')
return conflict_entries
[docs] def add_entry(self, entry):
"""
Add a new StructureEntry into the repository
"""
entry.repository = self
entry.path = self.path + '/' + entry.identifier
if not os.path.isdir(entry.path):
os.mkdir(entry.path)
entry.save()
if entry.tags is not None:
for itag in entry.tags:
if itag in self.tags:
if entry.identifier not in self.tags[itag]:
self.tags[itag].append(entry.identifier)
else:
self.tags[itag] = [entry.identifier]
self.save()
[docs] def add_many_entries(self, list_of_entries, tag, number_threads=1):
from threading import Thread
from pychemia.external.pymatgen import cif2structure
def worker(cifs, tags, results):
results['succeed'] = []
results['failed'] = []
for icif in cifs:
try:
struct = cif2structure(icif, primitive=True)
except ValueError:
struct = None
results['failed'].append(icif)
if struct is not None:
structentry = StructureEntry(structure=struct, original_file=icif, tags=[tags])
self.add_entry(structentry)
results['succeed'].append(icif)
th = []
result_list = []
num = int(math.ceil(float(len(list_of_entries)) / number_threads))
for i in range(number_threads):
result_list.append({})
th.append(Thread(target=worker,
args=(
list_of_entries[i * num:min((i + 1) * num, len(list_of_entries))], tag,
result_list[i])))
for i in th:
i.start()
return th, result_list
[docs] def del_entry(self, entry):
print('Deleting ', entry.identifier)
for i in entry.tags:
self.tags[i].remove(entry.identifier)
_shutil.rmtree(entry.path)
def __str__(self):
ret = 'Location: ' + self.path
ret += '\nNumber of entries: ' + str(len(self))
if len(self.tags) > 0:
for itag in self.tags:
ret += '\n\t' + itag + ':'
ret += '\n' + str(self.tags[itag])
else:
ret += '\nTags: ' + str(self.tags)
return ret
[docs] def structure_entry(self, ident):
return StructureEntry(repository=self, identifier=ident)
[docs]class ExecutionRepository:
"""
Defines the location and properties of the Repository
where all the executions will be stored
"""
def __init__(self):
"""
Creates a Repository for Executions
"""
pass
def _add2list(orig, dest):
if isinstance(orig, str):
if orig not in dest:
dest.append(orig)
elif isinstance(orig, list):
for iorig in dest:
if iorig not in dest:
dest.append(iorig)