Source code for railroadtracks.unifex

# Copyright 2014-2015 Novartis Institutes for Biomedical Research

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at


# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

Unified execution layer.

One general way to run things on the command line.

import sys, os, argparse
import collections
import logging
logger = logging.getLogger(__name__)
import railroadtracks
from importlib import import_module
from . import environment
from . import core


class UnifexError(Exception):

#FIXME: several classes 'Call' in rrt, I think. Delete/rename others
[docs]class Call(object): """ Unified call, turning a step + assets + parameters into a task. """ def __init__(self, step, assets, parameters): self._step = step self._assets = assets self._parameters = parameters
[docs] def execute(self): """ Execute the task. """ return, self._parameters)
@property def step(self): return self._step @property def assets(self): return self._assets @property def parameters(self): return self._parameters def __str__(self): res = (super(Call, self).__str__(), ' step: %s' % str(self.step), ' assets: %s' % str(self.assets), ' parameters: %s' % str(self.parameters)) return os.linesep.join(res)
def _set_logging(args): if args.logging_file is not None: logging.basicConfig(filename=args.logging_file, level=getattr(logging, args.logging_level)) def _make_stepdict(module): """ Make a dict of classes (key=step name, value=class). Duplicate step names will raise a ValueError. :param classlist: module :rtype: :class:`namespace` """ classlist = core.steplist(module) d = dict() for cls in classlist: if not issubclass(cls, (core.StepAbstract,)): raise ValueError("Classes used as step must inherit from core.StepAbstract.") stepname = cls._name if stepname is None: raise ValueError('The step name for class "%s" is not defined.' % cls) elif stepname in d: raise ValueError('The step name "%s" is defined twice.' % stepname) d[stepname] = cls return d def _extract_argdict(arglist): """ Build dictionary from list of parameters. """ argdict = collections.defaultdict(list) if arglist is not None: for src in arglist: key, value = src.split('=', 1) argdict[key].append(value) return argdict def _extract_arglist(argdict): """ Build list of parameters from a mapping of parameters. """ arglist = list() if argdict is not None: for key, value in argdict.items(): for v in value: src = '='.join((key, v)) arglist.append(src) return tuple(arglist) #FIXME: moved to ? def _cmdfromuei(uei): cmd = [sys.executable, '-m', 'railroadtracks.unifex', 'run', uei.model] # In the unified command line, the executable does not have to be specified, # and when parsing command line arguments with argparse the associated # parameter is set to None. # When building a unified command line, we skip the executable if it is # set to None. if uei.executable is not None: cmd.append(uei.executable) cmd.append('-s') cmd.extend('%s=%s' % (x, for x,y in zip(uei.source._fields, uei.source)) cmd.append('-t') cmd.extend('%s=%s' % (x, for x,y in zip(, if len(uei.parameters) > 0: cmd.append('-p') cmd.extend(" '%s'" % x for x in uei.parameters) return cmd #FIXME: needed ? # def wrapper_run(model_cls, sources, targets, # parameters): # executable = model_cls(args.executable, parameters = args.parameters) # res =, targets) # return res def _model_instance(args, steplist): """ :param args: arguments, such as the ones returned by :meth:`argparse.ArgumentParser.parse`. This should contain 2 attributes: - :attr:`model`, the name of the model used for execution, - :attr:`executable` the name (or path) to the executable used. :param steplist: sequence of known steps :rtype: :class:`` """ model_cls = steplist[args.model] try: model_instance = model_cls(args.executable) except environment.MissingSoftware: raise UnifexError("""The executable '%s' to use with the model class '%s' is not in the ${PATH}. Use either the full path, or add the executable to your PATH. """ % (args.executable, args.model)) except Exception as e: raise UnifexError("""Internal error while creating a %s using %s: %s """ % (model_cls, args.executable, e)) return model_instance
[docs]def build_AssetSet(AssetSet, values): """ :param AssetSet: :param values: values to create instances in the AssetSet """ # classes expected for the assets assetset_cls = getattr(AssetSet, core.AssetMetaReserved.SOURCES.value) assets = list() for x, val in zip(assetset_cls, values): if val is None: # if None, whether it is allowed (allownone True/False) is pushed to the construction # of the AssetSet below #FIXME: is this really the most transparent way to do it ? a = val else: if issubclass(x.cls, core.FileSequence): a = x.cls(x.cls._type(z) for z in val) else: a = x.cls(val) assets.append(a) assetset = AssetSet(*assets) return assetset
[docs]def unified_exec_run(args, steplist, msg=[]): """ Run a command. :param args: arguments in a class such as the one returned by :meth:`argparse.ArgumentParser.parse` :param steplist: sequence of known steps. The `args` will be matched against this to find the model class. :type steplist: sequence of :class:`core.StepAbstract`-inherting instances :param msg: list with (eventual) messages """ #FIXME: deprecate msg (use logging) _set_logging(args) model = _model_instance(args, steplist) Assets = model.Assets # extract the sources from the args sources_dict = _extract_argdict(args.source) tmp = ['Source parameters:', ] tmp.extend('%s: %s%s' % (k, str(v), os.linesep) for k,v in sources_dict.items()) logger.debug(os.linesep.join(tmp)) sources = tuple(sources_dict.get(f) for f in Assets.Source._fields) sources_undefined = list() for src, assetattr, assetname in zip(sources, Assets.Source._sources, Assets.Source._fields): if src is None: if not assetattr.allownone: sources_undefined.append(assetname) if len(sources_undefined) > 0: msg.append('The following sources must be defined (and are missing):\n'+\ '\n'.join('- %s' % sources_undefined)) # extract the targets from the args targets_dict = _extract_argdict( targets = tuple(targets_dict.get(f) for f in Assets.Target._fields) if None in targets: msg.append('The following targets must be defined (and are missing):\n'+\ '\n'.join('- %s' % (y) for x,y in enumerate(Assets.Target._fields) if targets[x] is None)) # exit early if missing parameters if len(msg) > 0: raise ValueError('\n'.join(msg)) # build the assets assets = Assets(build_AssetSet(Assets.Source, sources), build_AssetSet(Assets.Target, targets)) cleanparameters = tuple(x.lstrip() for x in args.parameters) cmd, returncode =, cleanparameters) msg = 'return code: %i' % returncode logger.debug(msg) if returncode != 0: sys.stderr.write(msg) sys.stderr.write('\n') return returncode
def unified_exec_version(args, steplist): _set_logging(args) executable = _model_instance(args, steplist) print(executable.version) return 0 def unified_exec_activities(args, steplist): _set_logging(args) executable = _model_instance(args, steplist) print(executable.activities) return 0 def unified_exec_model(args, steplist): _set_logging(args) print(' '.join(str(x) for x in steplist.keys())) return 0 def unified_exec(): def _steplist(module_name): module = import_module(module_name) steplist = _make_stepdict(module) return steplist def _unified_exec_run(args): msg = [] try: returncode = unified_exec_run(args, _steplist(args.module), msg=msg) except ValueError as ve: logger.error(msg) raise ve return returncode def _unified_exec_version(args): return unified_exec_version(args, _steplist(args.module)) def _unified_exec_activities(args): return unified_exec_activities(args, _steplist(args.module)) def _unified_exec_model(args): return unified_exec_model(args, _steplist(args.module)) parser = argparse.ArgumentParser(description='Unified model for RNA-Seq steps') subparsers = parser.add_subparsers(help='Action to perform. `run` will call the executable specified in the next parameter, `version` will return the version number, `activities` will list the RNA-Seq activities the executable is associated with') #FIXME: This means that one will be able to point to different R version, and packages will be coming # from that R version # run parser_run = subparsers.add_parser('run', help="Run a step") #FIXME: This means that one will be able to point to different R version, and packages will be coming # from that R version parser_run.add_argument('model', help='Class name used to model that executable. The classes are defined in the model (see -m/--module).') parser_run.add_argument('executable', nargs='?', # this is optional help='Name, or full path, for an executable. ' 'Note that the executable can be R, ' 'and in that case the class name (argument "model") ' 'will wrap an R script to be run with that R version.') parser_run.add_argument('-m', '--module', default='railroadtracks.rnaseq', help='module defining all models (default: %(default)s)') parser_run.add_argument('-s', '--source', nargs='*', help='Source file indicated as <label>=<filename>') parser_run.add_argument('-t', '--target', nargs='*', help='target file indicated as <label>=<filename>') parser_run.add_argument('-p', '--parameters', nargs='*', default = (), help='Additional parameter(s) for the wrapped executable') parser_run.add_argument('--logging-file', help='Name of log file.') parser_run.add_argument('--logging-level', choices = LOGGING_LEVELS, default = 'INFO', help='Level of logging information. DEBUG is quite verbose (default: %(default)s).') parser_run.set_defaults(func = _unified_exec_run) # version parser_version = subparsers.add_parser('version', help="Query version number") parser_version.add_argument('model', help='Class name used to model that executable. The classes are defined in the model (see -m/--module).') parser_version.add_argument('executable', nargs='?', # this is optional help='Name, or full path, for an executable. ' 'Note that the executable can be R, ' 'and in that case the class name (argument "model") ' 'will wrap an R script to be run with that R version.') parser_version.add_argument('-m', '--module', default='railroadtracks.rnaseq', help='module defining all models (default: %(default)s)') parser_version.add_argument('--logging-file', help='Name of log file.') parser_version.add_argument('--logging-level', choices = ('INFO', 'DEBUG'), default = 'INFO', help='Level of logging information. DEBUG is quite verbose (default: %(default)s).') parser_version.set_defaults(func = _unified_exec_version) # activities parser_activities = subparsers.add_parser('activities', help="Query activities") parser_activities.add_argument('model', help='Class name used to model that executable. The classes are defined in the model (see -m/--module).') parser_activities.add_argument('-m', '--module', default='railroadtracks.rnaseq', help='module defining all models (default: %(default)s)') parser_activities.add_argument('executable', nargs='?', # this is optional help='Name, or full path, for an executable. ' 'Note that the executable can be R, ' 'and in that case the class name (argument "model") ' 'will wrap an R script to be run with that R version.') parser_activities.add_argument('--logging-file', help='Name of log file.') parser_activities.add_argument('--logging-level', choices = ('INFO', 'DEBUG'), default = 'INFO', help='Level of logging information. DEBUG is quite verbose (default: %(default)s).') parser_activities.set_defaults(func = _unified_exec_activities) # model parser_model = subparsers.add_parser('model', help="Query on a model module") parser_model.add_argument('-m', '--module', default='railroadtracks.rnaseq', help='module defining all models (default: %(default)s)') parser_model.add_argument('--logging-file', help='Name of log file.') parser_model.add_argument('--logging-level', choices = ('INFO', 'DEBUG'), default = 'INFO', help='Level of logging information. DEBUG is quite verbose (default: %(default)s).') parser_model.set_defaults(func = _unified_exec_model) args = parser.parse_args() returncode = args.func(args) if __name__ == '__main__': import sys returncode = unified_exec() sys.exit(returncode)