import itertools
import json
import socket
from multiprocessing import Pool
import numpy as np
import pymongo
from bson.objectid import ObjectId
from pychemia import Structure, HAS_PYMONGO
from pychemia.utils.periodic import atomic_symbols
[docs]class PyChemiaDB:
def __init__(self, name='pychemiadb', host='localhost', port=27017, user=None, passwd=None, ssl=False,
replicaset=None):
"""
Creates a MongoDB client to 'host' with 'port' and connect it to the database 'name'.
Authentication can be used with 'user' and 'password'
:param name: (str) The name of the database
:param host: (str) The host as name or IP
:param port: (int) The number of port to connect with the server (Default is 27017)
:param user: (str) The user with read or write permissions to the database
:param passwd: (str,int) Password to authenticate the user into the server
"""
self.db_settings = {'name': name,
'host': host,
'port': port,
'user': user,
'passwd': passwd,
'ssl': ssl,
'replicaset': replicaset}
self.name = name
uri = 'mongodb://'
if user is not None:
uri += user
if passwd is not None:
uri += ':' + str(passwd)
uri += '@'
uri += host + ':' + str(port)
if user is not None:
uri += '/' + name
if pymongo.version_tuple[0] == 2:
self._client = pymongo.MongoClient(uri, ssl=ssl, replicaSet=replicaset)
elif pymongo.version_tuple[0] == 3:
self._client = pymongo.MongoClient(uri, ssl=ssl,
ssl_cert_reqs=pymongo.ssl_support.ssl.CERT_NONE,
replicaSet=replicaset)
else:
raise ValueError('Wrong version of pymongo')
self.db = self._client[name]
self.entries = self.db.pychemia_entries
self.set_minimal_schema()
def __str__(self):
ret = ' Database Name: %s\n' % self.name
ret += ' Host: %s\n' % self.db_settings['host']
ret += ' Port: %s\n' % self.db_settings['port']
ret += ' User: %s\n' % self.db_settings['user']
ret += ' SSL: %s\n' % self.db_settings['ssl']
return ret
[docs] def save_json(self, filename='db_settings.json'):
wf = open(filename, 'w')
json.dump(self.db_settings, wf, sort_keys=True, indent=4, separators=(',', ': '))
wf.close()
[docs] def insert(self, structure, properties=None, status=None):
"""
Insert a pychemia structure instance and properties
into the database
:param structure: (pychemia.Structure) An instance of Pychemia's Structure
:param properties: (dict) Dictionary of properties
:param status: (dict) Dictionary of status
:return:
"""
if status is None:
status = {}
if properties is None:
properties = {}
entry = {'structure': structure.to_dict, 'properties': properties, 'status': status}
entry_id = self.entries.insert(entry)
return entry_id
[docs] def clean(self):
self._client.drop_database(self.name)
self.db = self._client[self.name]
[docs] def update(self, entry_id, structure=None, properties=None, status=None):
"""
Update the fields 'structure', 'properties' or 'status' for a given identifier 'entry_id'
:param entry_id: (ObjectID, str)
:param structure: (pychemia.Structure) Structure to update
:param properties: (dict) Dictionary of properties to update
:param status: (dict) Status dictionary
:return: The identifier for the entry that was updated
:rtype : ObjectId
"""
assert (self.entries.find_one({'_id': entry_id}) is not None)
entry = self.entries.find_one({'_id': entry_id})
if structure is not None:
if isinstance(structure, Structure):
entry['structure'] = structure.to_dict
elif isinstance(structure, dict):
entry['structure'] = structure
else:
print('ERROR: Could not process the structure')
print(type(structure))
if properties is not None:
entry['properties'] = properties
if status is not None:
entry['status'] = status
self.entries.update({'_id': entry_id}, entry)
return entry_id
[docs] def find_AnBm(self, specie_a=None, specie_b=None, n=1, m=1):
"""
Search for structures with a composition expressed as AnBm
where one and only one between A or B is fixed and the numbers
amounts n and m are both fixed
:param specie_a: (str) atom symbol for the first specie
:param specie_b: (str) atom symbol for the second specie
:param n: number of atoms for specie 'a'
:param m: number of atoms for specie 'b'
:return:
:return: (list) List of ids for all the structures that fulfill
the conditions
"""
if specie_a is None and specie_b is None:
raise ValueError("Enter a specie for A or B")
elif specie_a is not None and specie_b is not None:
raise ValueError("Only enter A or B, not both")
elif specie_a is not None:
atom_fixed = specie_a
number_fixed = n
number_unfixed = m
assert (specie_a in atomic_symbols)
else:
atom_fixed = specie_b
number_fixed = m
number_unfixed = n
assert (specie_b in atomic_symbols)
ret = []
for entry in self.entries.find({'nspecies': 2, 'formula': {'$regex': atom_fixed}}):
comp = Structure.from_dict(entry['structure']).get_composition()
if atom_fixed in comp.composition and comp.composition[atom_fixed] % number_fixed == 0:
species = comp.species
species.remove(atom_fixed)
other_specie = species[0]
# See if the fraction n/m is correct for A and B
if number_unfixed * float(comp.composition[atom_fixed]) == number_fixed * float(
comp.composition[other_specie]):
ret.append(entry['_id'])
return ret
[docs] def find_composition(self, composition):
"""
Search for structures with a pseudo-composition expressed as dictionary
where symbols that are not atomic symbols such as A or X can be used to
represent arbitrary atoms
:return: (list) List of ids for all the structures that fulfill
the conditions
"""
ret = []
for entry in self.entries.find({'nspecies': len(composition)}):
comp = Structure.from_dict(entry['structure']).get_composition()
valid = True
if sum(comp.composition.values()) % sum(composition.values()) != 0:
valid = False
vect1 = np.float64(np.sort(comp.composition.values()))
vect2 = np.float64(np.sort(composition.values()))
v12 = vect1 / vect2
if not np.all(v12 == v12[0]):
valid = False
for ispecie in composition:
if ispecie in atomic_symbols and ispecie not in comp.species:
valid = False
if valid:
print(comp.composition)
ret.append(entry['_id'])
return ret
[docs] def get_structure(self, entry_id):
"""
Return the structure in the entry with id 'entry_id'
:rtype : Structure
"""
entry = self.entries.find_one({'_id': entry_id})
return Structure.from_dict(entry['structure'])
[docs] def get_dicts(self, entry_id):
"""
Return a tuple with the fields in an entry
structure, properties and status
:rtype : tuple
"""
entry = self.entries.find_one({'_id': entry_id})
structure_dict = entry['structure']
if 'properties' in entry:
properties = entry['properties']
else:
properties = None
if 'status' in entry:
status = entry['status']
else:
status = None
return structure_dict, properties, status
[docs] def get_entry(self, entry_id):
return self.entries.find_one({'_id': entry_id})
[docs] def is_locked(self, entry_id):
"""
Return if a given entry is locked by someone
evaluating the structure contained
:rtype : bool
"""
entry = self.entries.find_one({'_id': entry_id})
if 'status' in entry and entry['status'] is not None and 'lock' in entry['status']:
return True
else:
return False
[docs] def lock(self, entry_id, name=None):
if name is None:
name = socket.gethostname()
self.entries.update({'_id': entry_id}, {'$set': {'status.lock': name}})
[docs] def unlock(self, entry_id, name=None):
if name is None:
self.entries.update({'_id': entry_id}, {'$unset': {'status.lock': 1}})
else:
self.entries.update({'_id': entry_id, 'status.lock': name}, {'$unset': {'status.lock': 1}})
[docs] def set_minimal_schema(self):
for entry_id in self.entries.find({'status': None}, {'status': 1}):
print(entry_id)
self.entries.update({'_id': entry_id['_id']}, {'$set': {'status': {}}})
for entry_id in self.entries.find({'properties': None}):
print(entry_id)
self.entries.update({'_id': entry_id['_id']}, {'$set': {'properties': {}}})
[docs] def create_static(self, field):
for entry in self.entries.find({}):
entry[field + '_static'] = entry[field]
self.db.pychemia_entries.update({'_id': entry['_id']}, entry)
[docs] def map_to_all(self, function, nparal=6):
pool = Pool(processes=nparal)
cursor = self.entries.find({}, no_cursor_timeout=True)
print(cursor.count())
entries = [entry['_id'] for entry in cursor]
ret = pool.map(function, itertools.izip(itertools.repeat(self.db_settings), entries))
cursor.close()
return ret
[docs] def replace_failed(self):
for entry in self.entries.find({'status.relaxation': 'failed'}):
st = self.get_structure(entry['_id'])
comp = st.composition
new_structure = Structure.random_cell(comp)
self.entries.update({'_id': entry['_id']}, {'$unset': {'status.relaxation': 1,
'status.target_forces': 1,
'properties.energy': 1,
'properties.forces': 1,
'properties.stress': 1}})
self.update(entry['_id'], structure=new_structure)
[docs]def get_database(db_settings):
if 'host' not in db_settings:
db_settings['host'] = 'localhost'
if 'port' not in db_settings:
db_settings['port'] = 27017
if 'ssl' not in db_settings:
db_settings['ssl'] = False
if 'replicaset' not in db_settings:
db_settings['replicaset'] = None
if 'user' not in db_settings:
pcdb = PyChemiaDB(name=db_settings['name'], host=db_settings['host'], port=db_settings['port'],
ssl=db_settings['ssl'], replicaset=db_settings['replicaset'])
else:
pcdb = PyChemiaDB(name=db_settings['name'], host=db_settings['host'], port=db_settings['port'],
user=db_settings['user'], passwd=db_settings['passwd'], ssl=db_settings['ssl'],
replicaset=db_settings['replicaset'])
return pcdb
[docs]def object_id(entry_id):
if isinstance(entry_id, str):
return ObjectId(entry_id)
elif isinstance(entry_id, ObjectId):
return entry_id
[docs]def create_user(name, admin_name, admin_passwd, user_name, user_passwd, host='localhost', port=27017, ssl=False,
replicaset=None):
"""
Creates a new user for the database 'name'
:param name: (str) The name of the database
:param admin_name: (str) The administrator name
:param admin_passwd: (str) Administrator password
:param user_name: (str) Username for the database
:param user_passwd: (str) Password for the user
:param host: (str) Name of the host for the MongoDB server (default: 'localhost')
:param port: (int) Port to connect to the MongoDB server (default: 27017)
:param ssl: (bool) If True enable ssl encryption for communications to host (default: False)
:param replicaset: (str, None) Identifier of a Replica Set
"""
mc = pymongo.MongoClient(host=host, port=port, ssl=ssl, ssl_cert_reqs=pymongo.ssl_support.ssl.CERT_NONE,
replicaset=replicaset)
mc.admin.authenticate(admin_name, admin_passwd)
mc[name].add_user(user_name, user_passwd)
return PyChemiaDB(name=name, user=user_name, passwd=user_passwd, host=host, port=port, ssl=ssl,
replicaset=replicaset)
[docs]def create_database(name, admin_name, admin_passwd, user_name, user_passwd, host='localhost', port=27017, ssl=False,
replicaset=None):
return create_user(name, admin_name, admin_passwd, user_name, user_passwd, host, port, ssl, replicaset)
[docs]def has_connection():
if not HAS_PYMONGO:
return False
import pymongo
try:
maxSevSelDelay = 1
client = pymongo.MongoClient("localhost", serverSelectionTimeoutMS=maxSevSelDelay)
client.server_info() # force connection on a request as the
# connect=True parameter of MongoClient seems
# to be useless here
return True
except pymongo.errors.ServerSelectionTimeoutError as err:
# do whatever you need
print(err)
return False