Extending the framework¶
Writing custom steps¶
Simple case¶
A lot of the boilerplate handling is handled by parent classes. A custom class for a new tool will have to implement code to:
- create an instance (constructor)
- get the version
- run the step
The base class for steps is the abstract class railroadtracks.core.AssetStep
:
class StepAbstract(with_metaclass(abc.ABCMeta, object)):
""" Abstract parent for steps. """
# monicker under which the step will be known (must be unique within a step list)
_name = abc.abstractproperty()
# default name for executable associated with the class
_default_execpath = abc.abstractproperty()
# class of assets for the step (must be a child of :class:`AssetsStep`)
Assets = abc.abstractproperty()
activities = abc.abstractproperty(None, None, None,
"Activities associated with the step.")
version = abc.abstractproperty(None, None, None,
"Version of the executable associated with the step.")
@property
def hashdb(self):
return hash((type(self), self._default_execpath, self.version, self.activities))
@abc.abstractmethod
def run(self, assets, parameters=tuple()):
"""
:param assets: Assets to run the step with
:type assets: :class:`AssetsStep`
:param parameters: optional parameters
"""
raise NotImplementedError(_NOTIMPLEMENTED_ABSTRACT)
def uei(self, assets, parameters = tuple()):
# FIXME: should return the unified execution command line
uei = UnifiedExecInfo(self._execpath, self._name,
assets.source, assets.target, parameters,
None, None) #logging_file and logging_level
return uei
Custom steps will just have to implement the methods and properties.
For example, counting the reads after alignment with htseq-count is defined as a
child class of railroadtracks.model.quantify.QuantifierAbstract
, where Assets
are defined:
class QuantifierAbstract(core.StepAbstract):
__metaclass__ = abc.ABCMeta
Assets = AssetsQuantifier
activities = (ACTIVITY.QUANTIFY, )
Note
There is a class for each activity defined (see Activities),
and these can be used as parent
class for new steps performing one activity
The Assets
is inherited from its parent class, and does not need to be specified further.
class AssetsQuantifier(core.AssetsStep):
Source = core.assetfactory('Source', [core.AssetAttr('alignedreads', BAMFile, ''),
core.AssetAttr('annotationfile', GFFFile, '')])
Target = core.assetfactory('Target', [core.AssetAttr('counts', CSVFile, '')])
One can note that specific subclasses of railroadtracks.core.SavedEntityAbstract
can be specified
(here CSVFile
to indicate that file produced by the counting step is a CSV file).
The definition of assets represents a way to add a static typing flavor to Python,
as a number of checks are performed behind the hood by railroadtracks.model.files
.
The definition of railroadtracks.model.quantify.HTSeqCount
is then implementing
the remaining methods and attributes
class HTSeqCount(QuantifierAbstract):
_name = 'htseqcount'
_default_execpath = 'htseq-count'
_separator = '\t'
# set of parameters to get htseq-count to work with references such
# as bacteria or viruses (stored as a class attribute for convenience)
_noexons_parameters = ('--type=CDS', '--idattr=db_xref', '--stranded=no')
def __init__(self, executable=None):
if executable is None:
executable = type(self)._default_execpath
self._execpath = executable
self._version = None
@property
def version(self):
if self._version is None:
cmd = [self._execpath, '-h']
try:
logger.debug(subprocess.list2cmdline(cmd))
res = subprocess.check_output(cmd)
except OSError as ose:
raise unifex.UnifexError("""Command: %s
%s""" % (' '.join(cmd), ose))
m = re.match('^.*( version )?([^ \n]+)\.$', res.split('\n')[-2])
if m is None:
raise RuntimeError('Could not find the version number.')
self._version = m.groups()[1]
return self._version
def run(self, assets, parameters = tuple()):
# FIXME: shouldn't strandedness be a better part of the model ?
source = assets.source
sortedbam = source.alignedreads
if not isinstance(source.alignedreads, BAMFileSortedByID):
# htseq-count requires sorted entries
warnings.warn(("The source asset '%s' should ideally be sorted by read IDs. " +\
"We are sorting the file; use explicitly a '%s' rather than a '%s' "+\
"for better performances, as well as for reproducibility issues "+\
"(the sorting will use whatever 'samtools` is first found in the PATH)") \
% ("alignedreads", BAMFileSortedByID.__name__, BAMFile.__name__))
output_dir = os.path.dirname(assets.target.counts.name)
# temp file name for the sorted output
sortedbam_fh = tempfile.NamedTemporaryFile(dir=output_dir, suffix=".bam", delete=False)
# (cleaning temp files handled by Python, except sortedsam)
# -- sort
sorter = SamtoolsSorterByID()
sorter_assets = sorter.Assets(sorter.Assets.Source(source.alignedreads),
sorter.Assets.Target(BAMFile(sortedbam_fh.name)))
sorter.run(sorter_assets)
# sanity check:
if os.stat(sorter_assets.target.sortedbam.name).st_size == 0:
warnings.warn('The sorted BAM file is empty.')
sortedbam = sorter_assets.target.sortedbam
else:
sortedbam_fh = None
# BAM to SAM
cmd_bam2sam = ['samtools', 'view', sortedbam.name]
# build command line
cmd_count = [self._execpath, ]
cmd_count.extend(parameters)
cmd_count.extend(['-', source.annotationfile.name])
cmd = cmd_bam2sam + ['|', ] + cmd_count
logger.debug(subprocess.list2cmdline(cmd))
with open(os.devnull, "w") as fnull, \
open(assets.target.counts.name, 'w') as output_fh:
csv_w = csv.writer(output_fh)
# HTSeq-count does not use column names in its output, unfortunately,
# so we correct that
csv_w.writerow(['ID','count'])
with core.Popen(cmd_bam2sam,
stdout=subprocess.PIPE,
stderr=fnull) as p_bam2sam:
with p_bam2sam.stdout, core.Popen(cmd_count,
stdin = p_bam2sam.stdout,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE) as p_htseq:
p_bam2sam.stdout.close()
# read the output of HTSeq line-per-line
csv_r = csv.reader(p_htseq.stdout, delimiter='\t')
for row in csv_r:
csv_w.writerow(row)
p_htseq.stdout.flush()
stdout, stderr = p_htseq.communicate()
if p_htseq.returncode != 0:
if sortedbam_fh is not None:
os.unlink(sortedbam_fh.name)
logger.error(subprocess.list2cmdline(cmd), extra={'stderr': stderr})
raise subprocess.CalledProcessError(p_htseq.returncode, cmd, None)
if sortedbam_fh is not None:
os.unlink(sortedbam_fh.name)
return (cmd, p_htseq.returncode)
Steps performing several activities¶
Slightly more work than for the simple case will be required, but not too much either.
Unified execution¶
A new script, extending or overriding the default execution layer in the package can be written very simply:
from railroadtracks import core, rnaseq
class MyStep(core.Stepabstract):
# Definition of a new step
pass
NEW_STEPLIST_CLASSES = list()
for cls in rnaseq._STEPLIST_CLASSES:
NEW_STEPLIST.append(cls)
NEW_STEPLIST_CLASS.append(MyStep)
if __name__ == '__main__':
# use the command and subcommands parsing
# with the new list of steps
core.unified_exec(classlist = NEW_STEPLIST_CLASSES)