Source code for railroadtracks.easy

# Copyright 2014-2015 Novartis Institutes for Biomedical Research

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""" 
Package aiming at making common operations "easy".
"""

import logging
logger = logging.getLogger(__name__)

from collections import namedtuple, OrderedDict, Counter, defaultdict
import collections
from importlib import import_module
from railroadtracks import core, unifex, rnaseq, hortator
import os, shutil, time
import json
import tempfile
import itertools, operator

from railroadtracks.easy.tasks import Task, TaskSet, DbID
from railroadtracks.easy.tasks import (_TASK_DONE,
                                       _TASK_TODO,
                                       _TASK_INPROGRESS,
                                       _TASK_FAILED)

from railroadtracks.easy.execution import (DummyExecution,
                                           IterativeExecution,
                                           MultiprocessingExecution)

TASK_DONE = _TASK_DONE
TASK_TODO = _TASK_TODO
TASK_INPROGRESS = _TASK_INPROGRESS
TASK_FAILED = _TASK_FAILED

[docs]class Asset(object): """ An asset is either used by a task (then it is a source asset) or is produced by a task (then it is a target asset). """ __slots__ = ('_asset_id', '_entity', '_project') def __init__(self, project, savedentity, asset_id): self._asset_id = asset_id self._entity = savedentity self._project = project @property def id(self): return self._asset_id @property def entity(self): """ Instance of :class:`SavedEntityAbstract` (or rather a child class thereof). """ return self._entity @property def project(self): return self._project @property def parenttask(self): return self.project.persistent_graph.get_parenttask_of_storedentity(self.id)
ActivityCount = namedtuple('ActivityCount', 'count name status') class ActivityDoneCount(object): __slots__ = ['count_done', 'count_failed', 'total', 'name'] def __init__(self, name, count_done=0, count_failed=0, total=0): self.count_done = count_done self.count_failed = count_failed self.total = total self.name = name def dict_project_view_storage(project): if project.newproject: status = 'New "railroadtracks" project' else: status = 'Reopened existing "railroadtracks" project' statvfs= os.statvfs(project.wd) avail = statvfs.f_bavail * statvfs.f_frsize / (1024 ** 3) total = statvfs.f_blocks * statvfs.f_frsize / (1024 ** 3) used = (statvfs.f_blocks - statvfs.f_bfree) * statvfs.f_frsize / (1024 ** 3) taskstatuscount = project.persistent_graph.nconcrete_steps_status totaltasks = sum(x.count for x in taskstatuscount) d = {'status': status, 'wd': project.wd, 'wdspace_avail': avail, 'wdspace_used': used, 'wdspace_total': total, 'db_fn': project.db_fn, 'db_fn_size': '{:,}'.format(os.stat(project.db_fn).st_size / (1024 ** 2)), 'taskstatuscount': taskstatuscount, 'totaltasks': totaltasks} return d def dict_project_view_activities(project, done=hortator._TASK_DONE, failed=hortator._TASK_FAILED): cursor = project.persistent_graph.connection.cursor() sql = """ SELECT count(step_activity_id), step_activity.label, ss.label FROM step_activity LEFT JOIN step_type2activity AS sta ON sta.step_activity_id=step_activity.id LEFT JOIN step_type ON sta.step_type_id=step_type.id LEFT JOIN step_variant AS sv ON sv.step_type_id=step_type.id LEFT JOIN step_concrete AS sc ON sc.step_variant_id=sv.id LEFT JOIN step_status AS ss ON ss.id=sc.step_status_id GROUP BY step_activity_id, step_status_id ORDER BY step_activity_id """ cursor.execute(sql) counts = tuple(ActivityCount(*x) for x in cursor.fetchall()) activities = list() for name, grp in itertools.groupby(counts, operator.attrgetter('name')): adc = ActivityDoneCount(name) for ac in grp: if ac.status == done: adc.count_done = ac.count elif ac.status == failed: adc.count_failed = ac.count adc.total += ac.count activities.append(adc) d = {'activities': activities } return d def dict_project_view_results(project): results = project.persistent_graph.iter_finaltargets() ct = dict() for result in results: if result.status_label == _TASK_DONE: count_done = 1 else: count_done = 0 adc = ct.get(result.stored_entity_classname) if adc is None: adc = ActivityDoneCount(result.stored_entity_classname, count_done=count_done, total = 1) ct[result.stored_entity_classname] = adc else: adc.count_done += count_done adc.total += 1 res = {'results': ct.values()} return res def dict_assetset_view(assetset): l = len(assetset) AssetSetElement = namedtuple('AssetSetElement', 'isdefined name cls') d = {'super_repr': super(core.AssetSet, assetset).__repr__(), 'elements': []} for i in range(l): isdefined = getattr(assetset, assetset._sources[i].name)._defined d['elements'].append(AssetSetElement(isdefined, assetset._sources[i].name, str(assetset._sources[i].cls))) return d def str_progressbar(val, maxval, width=20, fillchar="#", emptychar=' '): assert val <= maxval nfill = int(round(1.0 * val / maxval * width)) res = fillchar * nfill if emptychar is not None: res += emptychar * (width-nfill) return res def str_project_view_storage(project, width=10): d = dict_project_view_storage(project) for k in ('wdspace_avail', 'wdspace_total'): d[k] = '{:,.2f}'.format(d[k]) res = """Status: %(status)s Working directory: %(wd)s Storage Available: %(wdspace_avail)s GB Total: %(wdspace_total)s GB Tasks Total: %(totaltasks)i """ % d return res def str_project_view_activities(project, done=hortator._TASK_DONE, failed=hortator._TASK_FAILED, width=20): d = dict_project_view_activities(project, done=done, failed=failed) if len(d['activities']) == 0: name_len = 0 else: name_len = max(len(a.name) for a in d['activities']) name_len = max(name_len, len('Activity')) # column header res = list(('%s %s' %('Activity'.center(name_len),'Progress'),)) for a in d['activities']: res.append('%s |%s| %.2f%% (%s/%s - failed: %s)' % (a.name.ljust(name_len), str_progressbar(a.count_done, a.total, width=width), 100.0*a.count_done/a.total, '{:,}'.format(a.count_done), '{:,}'.format(a.total), '{:,}'.format(a.count_failed)) ) res.append('') return os.linesep.join(res) def str_project_view_results(project, width=20): d = dict_project_view_results(project) if len(d['results']) == 0: name_len = 0 else: name_len = max(len(r.name) for r in d['results']) name_len = max(name_len, len('Result Type')) # column header res = list(('%s %s' %('Result Type'.center(name_len),'Progress'),)) for r in d['results']: res.append('%s |%s| %.2f%%' % (r.name.ljust(name_len), str_progressbar(r.count_done, r.total, width=width), 100.0*r.count_done/r.total)) res.append('') return os.linesep.join(res) return res def str_project_view(project): res = tuple((x(project) for x in (str_project_view_storage, str_project_view_activities, str_project_view_results))) return ('--%s' % os.linesep).join(res)
[docs]class Project(object): """ A project, that is a directory containing data as well as a persistent storage of steps and how derived data and final results were obtained. """
[docs] def __init__(self, model, wd='railroadtracks_project', db_fn=None, force_create = False, isolation_level=None, cached=False): """ :param model: module with the definition of the `model` (that is the classes defining the type of tasks computed) :param wd: Name of a working directory (where all intermediate and final results will be saved. :type wd: :class:`str` :param db_fn: Name of a file name for the database. If None, use the file "railroadtracks.db" in the directory specified in "wd". :type db_fn: :class:`str` or None :param isolation_level: passed to the constructor for :class:`PersistentTaskGraph` :rtype: a tuple with (:class:`hortator.StepGraph`, working directory as a :class:`str`, file name for the database and a :class:`str`) """ # check the model if not hasattr(model, 'ACTIVITY'): raise ValueError('The parameter `model` does not appear to be a model (ACTIVITY missing).') newproject = force_create # derived data files and final results will be in a temporary directory if not os.path.isdir(wd): raise ValueError("The working directory '%s' should be a directory" % wd) # create the database if db_fn is None: db_fn = os.path.join(wd, 'railroadtracks.db') if not os.path.exists(db_fn): newproject = True if cached: cls_persistent = hortator.CachedPersistentTaskGraph else: cls_persistent = hortator.PersistentTaskGraph persistent_graph = cls_persistent(db_fn, model, wd=wd, force_create=force_create, isolation_level=isolation_level) self._persistent_graph = persistent_graph # cache the dependency graph cache = hortator.StepGraph(persistent_graph) self._cache = cache self._wd = wd self._db_fn = db_fn self._newproject = newproject
@property def persistent_graph(self): """ Access to the persistent graph. This is will be slower than access the cached version, but will always be accurate.""" return self._persistent_graph @property def cache(self): """ Cache for the dependency graph. This may not reflect changes made by an independent access to the project or a direct access to the persistent graph (see :attr:`persistent_graph`) but will be much faster and the preferred way to access the graph when there is only one concurrent access. """ return self._cache @property def wd(self): """Working directory. The directory in which derived data is stored.""" return self._wd @property def db_fn(self): """Path to the database file""" return self._db_fn @property def newproject(self): """Tell whether this is a new project, rather than a existing project (re)opened. """ return self._newproject @property def model(self): """Model used in the project.""" return self.persistent_graph._model def __repr__(self): res = """ %s Working directory: %s Database file: %s Number of recorded steps: %i """ if self.newproject: status = 'New project' else: status = 'Reopened existing project' return res % (status, self.wd, self.db_fn, self.persistent_graph.nconcrete_steps) def __str__(self): return str_project_view(self)
[docs] def get_targetsofactivity(self, activity): """ Retrieve the targets of steps performing a specific activity. (calls the method of the same name in the contained :class:`PersistentTaskGraph`) :param activity: an activity :type activity: :class:`Enum` """ return self.persistent_graph.get_targetsofactivity(activity)
[docs] def get_targetsoftype(self, obj): """ Retrieve the targets having a given type. (calls the method of the same name in the contained :class:`PersistentTaskGraph`) :param obj: a type, an instance, or a type name :type obj: :class:`type`, :class:`Object`, or :class:`str` """ if isinstance(obj, str): # name of a class cls = getattr(self.persistent_graph._model, obj) elif isinstance(obj, core.SavedEntityAbstract): cls = type(obj) elif issubclass(obj, core.SavedEntityAbstract): cls = obj else: raise ValueError("Invalid parameter.") return self.persistent_graph.get_targetsoftype(cls.__name__)
[docs] def get_tasksoftype(self, taskorstep): """ Retrieve tasks matching the type of "taskorstep". The parameter 'taskorstep' can be either: - a class inheriting from StepAbstract - an instance of Task - an instance of StepAbstract In the two last cases, the path to the executable and the version number are used to filter the tasks returned. """ cache = self.persistent_graph cursor = cache.connection.cursor() stepvariant_ids = None if issubclass(taskorstep, core.StepAbstract): cls = taskorstep step_type_id = cache.id_step_type(cls.activities) sql = """ SELECT id FROM step_variant WHERE step_type_id=? AND cls=? """ #FIXME: building the 'cls' string the SQL string # should be factored out as a function # in hortator.py cursor.execute(sql, (step_type_id.id, '.'.join((cls.__module__, cls.__name__)))) stepvariant_ids = tuple(x[0] for x in cursor.fetchall()) else: if isinstance(taskorstep, Task): step = taskorstep.call.step elif isinstance(taskorstep, core.StepAbstract): step = taskorstep stepvariant_ids = (cache.id_step_variant(step, step.activities).id, ) if stepvariant_ids is None: raise ValueError('"taskorstep" can be either a Task or a core.StepAbstract') taskset = TaskSet() sql = """ SELECT step_concrete.id FROM step_concrete WHERE step_concrete.step_variant_id=? """ for sv_id in stepvariant_ids: cursor.execute(sql, (sv_id,)) for res in cursor.fetchall(): task_id = res[0] taskset.add(self.get_task(task_id)) return taskset
[docs] def get_taskswithactivity(self, activity): """ Retrieve tasks matching the given activity. """ cache = self.persistent_graph cursor = cache.connection.cursor() taskset = TaskSet() sql = """ SELECT step_concrete.id FROM step_concrete INNER JOIN step_variant ON step_variant.id=step_concrete.step_variant_id INNER JOIN step_type ON step_type.id=step_variant.step_type_id INNER JOIN step_type2activity ON step_type2activity.step_type_id=step_type.id INNER JOIN step_activity ON step_activity.id=step_type2activity.step_activity_id WHERE step_activity.label=? """ cursor.execute(sql, (activity.value,)) for res in cursor.fetchall(): task_id = res[0] taskset.add(self.get_task(task_id)) return taskset
[docs] def iter_srcassets(self, task): """ Return the source files for a given task (calls the method of the same name in the contained :class:`PersistentTaskGraph`) :param task: a :class:`Task`. """ for x in self.persistent_graph.get_srcassets(task.task_id): yield Asset(self, x.resurrect(self.model), DbID(x.id))
[docs] def iter_targetassets(self, task): """ Return the target files for a given task (calls the method of the same name in the contained :class:`PersistentTaskGraph`) :param task: a :class:`Task`. """ for x in self.persistent_graph.get_targetassets(task.task_id): yield Asset(self, x.resurrect(self.model), DbID(x.id))
[docs] def add_task(self, step, assets, parameters=(), tag = 1): """ Add a task to the project. If any of the assets' targets is not defined, it will be defined automatically. :param step: :type param: :class:`StepAbstract` :param assets: :param parameters: :param tag: a tag (to differentiate repetitions of the exact same task) """ step_concrete_id = self.cache.add(step, assets, parameters=tuple(parameters), tag=tag) call = unifex.Call(step, assets, parameters) task = Task(self, call, step_concrete_id) return task
[docs] def get_task(self, task_id): """ Given a task ID, retrieve the associated task. """ if not hasattr(task_id, 'id'): task_id = DbID(task_id) task_dbentry = self.persistent_graph._get_stepconcrete(task_id) # get the step if '.' in task_dbentry.clsname: module_name, cls_name = task_dbentry.clsname.rsplit('.', 1) else: module_name = self.persistent_graph._model cls_name = task_dbentry.clsname module = import_module(module_name) cls = getattr(module, cls_name) step = cls(task_dbentry.executable) # get the assets sources = [None, ]*len(cls.Assets.Source._fields) model = self.persistent_graph._model for i, x in enumerate(self.persistent_graph.get_srcassets(task_id)): src_i = cls.Assets.Source._fields.index(x.label) sources[src_i] = x.resurrect(model) targets = [None, ]*len(cls.Assets.Target._fields) for i, x in enumerate(self._persistent_graph.get_targetassets(task_id)): tgt_i = cls.Assets.Target._fields.index(x.label) targets[tgt_i] = x.resurrect(model) assets = cls.Assets(cls.Assets.Source(*sources), cls.Assets.Target(*targets)) # get the parameters parameters = json.loads(task_dbentry.parameters) # build the call call = unifex.Call(step, assets, parameters) # build the task task = Task(self, call, task_dbentry.id) return task
def add_asset(self, asset): if hasattr(asset, 'iter_storedentities'): raise NotImplementedError('Not yet implemented') self.persistent_graph.id_stored_sequence() else: seid = self.persistent_graph.id_stored_entity(type(asset), asset.name) return hortator.StoredEntity(seid.id, None, asset.__class__.__name__, asset.name)
[docs]def call_factory(project, step_concrete_id, stepobj, assets, parameters=()): """ :param stepobj: Step object :type stepobj: instance of :class:`StepAbstract` :param assets: assets :type assets: instance of :class:`AssetsStep` :rtype: :class:`function` """ def call(): import hortator cmd, returncode = stepobj.run(assets, parameters=parameters) if returncode != 0: raise RuntimeError('Error (return code %i) while running: %s' % (returncode, ' '.join(cmd))) project.persistent_graph.step_concrete_state(step_concrete_id, hortator._TASK_STATUS_LIST[hortator._TASK_DONE]) return call
def _delete_tasks_in_set(taskset): """ Delete a set of tasks. """ # get the target assets associated with the task #db = task.project.persistent_graph #storedentitities = list() #for task in taskset: # storedentities.extend(db.get_targetassets(task.id)) sql_create_task_tmp = """ CREATE TEMP TABLE task_to_delete ( id INTEGER PRIMARY KEY )""" sql_add_task_id_to_delete = """ INSERT INTO task_to_delete (id) VALUES (?) """ sql_create_stored_entity_tmp = """ CREATE TEMP TABLE se_to_delete AS SELECT stored_entity.id AS se_id FROM stored_entity INNER JOIN step_concrete2targetfile ON stored_entity.id=step_concrete2targetfile.stored_entity_id INNER JOIN task_to_delete ON task_to_delete.id=step_concrete2targetfile.step_concrete_id WHERE se_id IN (SELECT task_to_delete.id FROM task_to_delete)""" sql_delete_src_associations = """ DELETE FROM step_concrete2targetfile WHERE step_concrete_id IN (SELECT id from task_to_delete) """ sql_delete_target_association = """ DELETE FROM step_concrete2targetfile WHERE step_concrete_id IN (SELECT id from task_to_delete) """ sql_delete_src_association = """ DELETE FROM step_concrete2srcfile WHERE step_concrete_id IN (SELECT id from task_to_delete) """ sql_delete_stored_entry = """ DELETE FROM stored_entity WHERE id IN (SELECT id from se_to_delete) """ sql_delete_tasks = """ DELETE FROM step_concrete WHERE id IN (SELECT id from task_to_delete) """ cursor = taskset._project.persistent_graph.connection.cursor() cursor.execute(sql_create_task_tmp) cursor.executemany(sql_add_task_id_to_delete, ((task.task_id,) for task in taskset)) cursor.execute(sql_create_stored_entity_tmp) # delete the associations between stored_entities that are the target of tasks in the taskset and tasks cursor.execute(sql_delete_target_association) # same for sources cursor.execute(sql_delete_src_association) # delete the stored_entries cursor.execute(sql_delete_stored_entry) # delete the tasks in the taskset cursor.execute(sql_delete_tasks) # delete the directories for task in taskset: shutil.rmtree(task.dirname) cursor.execute("DROP TABLE task_to_delete") cursor.execute("DROP TABLE se_to_delete") taskset._project.persistent_graph.connection.commit() class FrozenNamespace(object): # Implementation note: a :class:`namedtuple` was not used, because we wanted # all class attributes and methods to start with '_', in order to let autocompletion # only show instance-defined attributes by default def __init__(self, args): """ :param args: iterable of (key,value) pairs, the key being the name in the namespace. """ fields = [] for key, value in args: fields.append(key) setattr(self, key, value) self.__fields = tuple(fields) def __setattr__(self, key, value): if hasattr(self, key): raise ValueError("The attribute '%s' is already defined." % key) self.__dict__[key] = value @property def _fields(self): return self.__fields
[docs]class Environment(object): """ Represent the current environment in a (presumably) easy way for writing recipes. """
[docs] def __init__(self, model): """ :param model: Python module following the :mod:`railroadtracks` model. """ classes, activityenum = core.steplist(model), model.ACTIVITY stepclasses = list() stepinstances = list() knownclassnames = set() knowninstancenames = set() AvailableStepGroup = namedtuple('AvailableStepGroup', 'default_instances classes') for cls in unifex._make_stepdict(model).values(): if not issubclass(cls, core.StepAbstract): raise ValueError("Classes used as steps must inherit from core.StepAbstract.") stepname = cls.__name__.lower() if stepname is None: raise ValueError('The step name for class "%s" is not defined.' % cls) elif stepname in knownclassnames: # check unique class names raise ValueError('The step name "%s" is defined twice.' % stepname) elif stepname.lower() in knowninstancenames: # check unique lower-case for class names (these will be used for instance names) raise ValueError('The step name "%s", when converted to lower case, is defined twice.' % stepname) stepclasses.append(cls) stepinstances.append(cls(cls._default_execpath)) knownclassnames.add(stepname) knowninstancenames.add(stepname.lower()) stepclasses = tuple(stepclasses) stepinstances = tuple(stepinstances) od = OrderedDict((x, []) for x in activityenum) for s,si in zip(stepclasses, stepinstances): for a in s.activities: od[a].append((s, si)) activitylist = list() for activity, classinstancepairs in od.items(): # iterate over the keys (each being an activity) clsnames = list() instancenames = list() for stepcls, stepinstance in classinstancepairs: clsname = stepcls.__name__ instancename = clsname.lower() clsnames.append(clsname) instancenames.append(instancename) #NT_cls = frozencollection(activity.name+'_cls', instancenames) activitylist.append(FrozenNamespace((key, element[1]) for key, element in zip(instancenames, classinstancepairs))) #activitylist.append(NT_cls(*(element for element in clsnames))) self.__activities = FrozenNamespace((key.name, value) for key, value in zip(od.keys(), activitylist)) self._stepclasses = stepclasses self._stepinstances = stepinstances
@property def activities(self): """ Access the activities declared by the model as attributes. """ return self.__activities @property def stepclasses(self): """ Steps. """ return self._stepclasses @property def stepinstances(self): """ Default instance for the steps (created from the default executables in the PATH). """ return self._stepinstances
if __name__ == '__main__': def _exec_info(args): model = importlib.import_module(args.model) project = Project(model, wd=args.wd, db_fn=args.db_fn) dbid = DbID(args.task_id) res = project.persistent_graph.step_concrete_info(dbid) print(res) def _exec_cmd(args): project = Project(wd=args.wd, db_dn=args.db_fn) raise NotImplementedError() def _exec_run(args): project = Project(wd=args.wd, db_dn=args.db_fn) task = project.get_task(args.task_id) if task.status == hortator._TASK_DONE: print('The task %i is already done' % task.id) return else: try: res = task.execute() task.status = hortator._TASK_DONE except: task.status = hortator._TASK_FAILED import argparse, importlib parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='Action to perform.') parser_info = subparsers.add_parser('status', help="Retrieve information about a step.") parser_cmd = subparsers.add_parser('cmd', help="Return the unified execution command line.") parser_run = subparsers.add_parser('run', help="Run a step.") parser.add_argument('-d', '--working-directory', dest = 'wd', help = 'Working directory for the project.') parser.add_argument('-b', '--db-file', dest = 'db_fn', help = 'Database file') parser.add_argument('-m', '--model', dest = 'model', help = 'Model (as a Python module)') parser.add_argument(dest = 'task_id', type = int, help = 'Task ID') parser_info.set_defaults(func = _exec_info) parser_cmd.set_defaults(func = _exec_cmd) parser_run.set_defaults(func = _exec_run) args = parser.parse_args() args.func(args)