import os
import pymongo
from pychemia import Structure
import gridfs
from pychemia.utils.computing import hashfile
[docs]class PyChemiaQueue:
def __init__(self, name='Queue', host='localhost', port=27017, user=None, passwd=None, ssl=False, replicaset=None):
"""
Creates a MongoDB client to 'host' with 'port' and connect it to the database 'name'.
Authentication can be used with 'user' and 'password'
:param name: (str) The name of the database
:param host: (str) The host as name or IP
:param port: (int) The number of port to connect with the server (Default is 27017)
:param user: (str) The user with read or write permissions to the database
:param passwd: (str/int) Password to authenticate the user into the server
:return:
"""
self.db_settings = {'name': name, 'host': host, 'port': port, 'user': user, 'passwd': passwd, 'ssl': ssl}
self.name = name
uri = 'mongodb://'
if user is not None:
uri += user
if passwd is not None:
uri += ':' + str(passwd)
uri += '@'
uri += host + ':' + str(port)
print('URI:', uri)
if user is not None:
uri += '/' + name
if replicaset is not None:
self._client = pymongo.MongoClient(uri, ssl=ssl, replicaset=replicaset)
else:
self._client = pymongo.MongoClient(host=host, port=port, ssl=ssl,
ssl_cert_reqs=pymongo.ssl_support.ssl.CERT_NONE)
for i in ['version']:
print('%20s : %s' % (i, self._client.server_info()[i]))
self.db = self._client[name]
if user is not None and self.db.authenticate(user, passwd):
print('Authentication successful')
self.set_minimal_schema()
self.fs = gridfs.GridFS(self.db)
[docs] def add_file(self, entry_id, location, filepath):
assert (os.path.isfile(filepath))
hashcode = hashfile(filepath)
rf = open(filepath, 'rb')
filename = os.path.basename(filepath)
length = os.path.getsize(filepath)
existing = self.db.fs.files.find_one({'hash': hashcode, 'length': length, 'filename': filename})
if existing is None:
file_id = self.fs.put(rf, filename=os.path.basename(filename), hash=hashcode)
print('New file ', file_id)
self.db.pychemia_entries.update({'_id': entry_id}, {'$addToSet': {location + '.files': {'file_id': file_id,
'name': filename,
'hash': hashcode}}})
else:
file_id = existing['_id']
print('File already present ', file_id)
self.db.pychemia_entries.update({'_id': entry_id}, {'$addToSet': {location + '.files': {'file_id': file_id,
'name': filename,
'hash': hashcode}}})
[docs] def set_minimal_schema(self):
for entry in self.db.pychemia_entries.find({'meta': None}, {'_id': 1}):
print('Missing field "meta" on', entry['_id'])
self.db.pychemia_entries.update({'_id': entry['_id']}, {'$set': {'meta': {}}})
for entry in self.db.pychemia_entries.find({'input': None}, {'_id': 1}):
print('Missing field "input" on', entry['_id'])
self.db.pychemia_entries.update({'_id': entry['_id']}, {'$set': {'input': {}}})
for entry in self.db.pychemia_entries.find({'output': None}, {'_id': 1}):
print('Missing field "output" on', entry['_id'])
self.db.pychemia_entries.update({'_id': entry['_id']}, {'$set': {'output': {}}})
for entry in self.db.pychemia_entries.find({'job': None}, {'_id': 1}):
print('Missing field "job" on', entry['_id'])
self.db.pychemia_entries.update({'_id': entry['_id']}, {'$set': {'job': {}}})
[docs] def set_structure(self, entry_id, location, structure):
assert (location in ['input', 'output'])
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {location + '.structure': structure.to_dict}})
[docs] def new_entry(self, structure=None, variables=None, code=None, files=None, priority=0, dbname=None, db_id=None):
if variables is not None and code is None:
raise ValueError("Input variables requiere code name")
if variables is None and code is not None:
raise ValueError("Input variables require code name")
entry = {'input': {}, 'output': {}, 'job': {}, 'meta': {'submitted': False,
'priority': priority,
'finished': False,
'deployed': False,
'dbname': dbname,
'db_id': db_id}}
entry_id = self.db.pychemia_entries.insert(entry)
# Commented for compatibility with mongo 2.4
# self.db.pychemia_entries.update_one({'_id': entry_id}, {'$currentDate': {'meta.CreationDate': True}})
if structure is not None:
self.set_input_structure(entry_id, structure)
if variables is not None and code is not None:
self.set_input(entry_id, code=code, inputvar=variables)
if files is not None:
for ifile in files:
self.add_input_file(entry_id, filename=ifile)
return entry_id
[docs] def set_job_settings(self, entry_id, nparal=None, queue=None, nhours=None, mail=None, task_name=None,
task_settings=None, task_kind=None):
if nparal is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.nparal': nparal}})
if queue is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.queue': queue}})
if mail is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.mail': mail}})
if nhours is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.nhours': nhours}})
if task_name is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.task_name': task_name}})
if task_kind is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.task_kind': task_name}})
if task_settings is not None:
self.db.pychemia_entries.update({'_id': entry_id}, {'$set': {'job.task_settings': task_settings}})
[docs] def set_output_structure(self, entry_id, structure):
return self.set_structure(entry_id, 'output', structure)
[docs] def get_structure(self, entry_id, location):
"""
Return the structure in the entry with id 'entry_id'
:rtype : Structure
"""
assert (location in ['input', 'output'])
entry = self.db.pychemia_entries.find_one({'_id': entry_id}, {location: 1})
return Structure.from_dict(entry[location]['structure'])
[docs] def get_output_structure(self, entry_id):
return self.get_structure(entry_id, 'output')