Extending the framework

Writing custom steps

Simple case

A lot of the boilerplate handling is handled by parent classes. A custom class for a new tool will have to implement code to:

  • create an instance (constructor)
  • get the version
  • run the step

The base class for steps is the abstract class railroadtracks.core.AssetStep:

class StepAbstract(with_metaclass(abc.ABCMeta, object)):
    """ Abstract parent for steps. """

    # monicker under which the step will be known (must be unique within a step list)
    _name = abc.abstractproperty()
    # default name for executable associated with the class
    _default_execpath = abc.abstractproperty()
    
    # class of assets for the step (must be a child of :class:`AssetsStep`)
    Assets = abc.abstractproperty()

    activities = abc.abstractproperty(None, None, None, 
                                      "Activities associated with the step.")

    version = abc.abstractproperty(None, None, None,
                                   "Version of the executable associated with the step.")


    @property
    def hashdb(self):
        return hash((type(self), self._default_execpath, self.version, self.activities))

    @abc.abstractmethod
    def run(self, assets, parameters=tuple()):
        """ 
        :param assets: Assets to run the step with
        :type assets: :class:`AssetsStep`
        :param parameters: optional parameters
        """
        raise NotImplementedError(_NOTIMPLEMENTED_ABSTRACT)

    def uei(self, assets, parameters = tuple()):
        # FIXME: should return the unified execution command line
        uei = UnifiedExecInfo(self._execpath, self._name,
                              assets.source, assets.target, parameters,
                              None, None) #logging_file and logging_level
        return uei

Custom steps will just have to implement the methods and properties. For example, counting the reads after alignment with htseq-count is defined as a child class of railroadtracks.model.quantify.QuantifierAbstract, where Assets are defined:

class QuantifierAbstract(core.StepAbstract):
    __metaclass__ = abc.ABCMeta
    Assets = AssetsQuantifier
    activities = (ACTIVITY.QUANTIFY, )

Note

There is a class for each activity defined (see Activities), and these can be used as parent class for new steps performing one activity The Assets is inherited from its parent class, and does not need to be specified further.

class AssetsQuantifier(core.AssetsStep):
    Source = core.assetfactory('Source', [core.AssetAttr('alignedreads', BAMFile, ''),
                                          core.AssetAttr('annotationfile', GFFFile, '')])
    Target = core.assetfactory('Target', [core.AssetAttr('counts', CSVFile, '')])

One can note that specific subclasses of railroadtracks.core.SavedEntityAbstract can be specified (here CSVFile to indicate that file produced by the counting step is a CSV file). The definition of assets represents a way to add a static typing flavor to Python, as a number of checks are performed behind the hood by railroadtracks.model.files.

The definition of railroadtracks.model.quantify.HTSeqCount is then implementing the remaining methods and attributes

class HTSeqCount(QuantifierAbstract):
        
    _name = 'htseqcount'
    _default_execpath = 'htseq-count'
    _separator = '\t'
    # set of parameters to get htseq-count to work with references such
    # as bacteria or viruses (stored as a class attribute for convenience)
    _noexons_parameters = ('--type=CDS', '--idattr=db_xref', '--stranded=no')

    def __init__(self, executable=None):
        if executable is None:
            executable = type(self)._default_execpath
        self._execpath = executable
        self._version = None

    @property
    def version(self):
        if self._version is None:
            cmd = [self._execpath, '-h']
            try:
                logger.debug(subprocess.list2cmdline(cmd))
                res = subprocess.check_output(cmd)
            except OSError as ose:
                raise unifex.UnifexError("""Command: %s
                %s""" % (' '.join(cmd), ose))

            m = re.match('^.*( version )?([^ \n]+)\.$', res.split('\n')[-2])
            if m is None:
                raise RuntimeError('Could not find the version number.')
            self._version = m.groups()[1]
        return self._version

    def run(self, assets, parameters = tuple()):
        # FIXME: shouldn't strandedness be a better part of the model ?
        source = assets.source
        sortedbam = source.alignedreads
        if not isinstance(source.alignedreads, BAMFileSortedByID):
            # htseq-count requires sorted entries
            warnings.warn(("The source asset '%s' should ideally be sorted by read IDs. " +\
                           "We are sorting the file; use explicitly a '%s' rather than a '%s' "+\
                           "for better performances, as well as for reproducibility issues "+\
                           "(the sorting will use whatever 'samtools` is first found in the PATH)") \
                          % ("alignedreads", BAMFileSortedByID.__name__, BAMFile.__name__))
            output_dir = os.path.dirname(assets.target.counts.name)
            # temp file name for the sorted output
            sortedbam_fh = tempfile.NamedTemporaryFile(dir=output_dir, suffix=".bam", delete=False)
            # (cleaning temp files handled by Python, except sortedsam)
            # -- sort
            sorter = SamtoolsSorterByID()
            sorter_assets = sorter.Assets(sorter.Assets.Source(source.alignedreads),
                                          sorter.Assets.Target(BAMFile(sortedbam_fh.name)))
            sorter.run(sorter_assets)
            # sanity check:
            if os.stat(sorter_assets.target.sortedbam.name).st_size == 0:
                warnings.warn('The sorted BAM file is empty.')
            sortedbam = sorter_assets.target.sortedbam
        else:
            sortedbam_fh = None

        # BAM to SAM
        cmd_bam2sam = ['samtools', 'view', sortedbam.name]

        # build command line
        cmd_count = [self._execpath, ]
        cmd_count.extend(parameters)
        cmd_count.extend(['-', source.annotationfile.name])
        cmd = cmd_bam2sam + ['|', ] + cmd_count

        logger.debug(subprocess.list2cmdline(cmd))
        with open(os.devnull, "w") as fnull, \
             open(assets.target.counts.name, 'w') as output_fh:
            csv_w = csv.writer(output_fh)
            # HTSeq-count does not use column names in its output, unfortunately,
            # so we correct that
            csv_w.writerow(['ID','count'])
            with core.Popen(cmd_bam2sam, 
                            stdout=subprocess.PIPE,
                            stderr=fnull) as p_bam2sam:
                with p_bam2sam.stdout, core.Popen(cmd_count, 
                                                  stdin = p_bam2sam.stdout,
                                                  stdout = subprocess.PIPE,
                                                  stderr = subprocess.PIPE) as p_htseq:
                    p_bam2sam.stdout.close()
                    # read the output of HTSeq line-per-line
                    csv_r = csv.reader(p_htseq.stdout, delimiter='\t')
                    for row in csv_r:
                        csv_w.writerow(row)
                        p_htseq.stdout.flush()
                    stdout, stderr = p_htseq.communicate()
                if p_htseq.returncode != 0:
                    if sortedbam_fh is not None:
                        os.unlink(sortedbam_fh.name)
                    logger.error(subprocess.list2cmdline(cmd), extra={'stderr': stderr})
                    raise subprocess.CalledProcessError(p_htseq.returncode, cmd, None)
        if sortedbam_fh is not None:
            os.unlink(sortedbam_fh.name)
        return (cmd, p_htseq.returncode)

Steps performing several activities

Slightly more work than for the simple case will be required, but not too much either.

Unified execution

A new script, extending or overriding the default execution layer in the package can be written very simply:

from railroadtracks import core, rnaseq

class MyStep(core.Stepabstract):
   # Definition of a new step
   pass

NEW_STEPLIST_CLASSES = list()
for cls in rnaseq._STEPLIST_CLASSES:
    NEW_STEPLIST.append(cls)
NEW_STEPLIST_CLASS.append(MyStep)

if __name__ == '__main__':
    # use the command and subcommands parsing
    # with the new list of steps
    core.unified_exec(classlist = NEW_STEPLIST_CLASSES)