Recipes¶
The model, together with the persistence layer, is designed to make the writing of sequences of steps for RNA-Seq data processing rather simple and and make changing to an alternative step (e.g., aligner, differential expression method, etc...) trivial. This is also designed to make coexisting variants something a user does not have to worry about (unless inclined to - the system is open).
Note
The general principles to remember are limited to:
- Steps require assets to run (and optionally parameters)
- Assets are constituted of two groups: source and target elements
- Parameters are optionally given
The bundle of source and target assets, parameters, and a step represents a task. Explanations about step and assets follow.
>>> source = SomeStep.Assets.Source( inputentity )
>>> target = SomeStep.Assets.Target( outputentity )
>>> assets = SomeStep.Assets(source, target)
>>> step = SomeStep( pathtoexecutable )
>>> step.run( assets )
Steps and assets¶
Concept¶
All steps in the process are connected through intermediate data files which we call assets. Bioinformatics tools are almost always designed to operate on files, with the occasional pipes being used.
A step can be represented as the step itself with input files (the source assets) and output files (the target assets).
This group of nodes (here 4 nodes: 2 source, 1 step, 1 target) can also be collapsed as a step, and the representation of a workflow be the graph connecting this summary representation.
For example, the initial steps for aligner-based RNA-Seq can be represented as:
The Step-and-Assets graph will then look like this:
The nodes Indexed Genome and Aligned Reads are duplicated for clarity with the grouping of nodes, but it is the same entity saved on disk produced and used by the two steps.
When using the framework, the easiest way to think about it is to start from
a step (child class
of StepAbstract
), and look for the attribute Assets
.
That attribute is a class representing the step’s assets, and is itself
containing 2 class attributes Source
and Target
(themselves classes as well).
For example with the class modeling the aligner “STAR”:
import railroadtracks.rnaseq import StarIndex, StarAlign
# assets for the indexing of references (indexed for the alignment)
StarIndex.Assets
# assets for aligning reads against indexed references
StarAlign.Assets
The assets are divided into two sets, the Source and the Target, for the files the step is using and the files
the step is producing respectively. Source
and Target
are themselves classes.
import railroadtracks.rnaseq import StarIndex, StarAlign
# assets for the indexing of references (indexed for the alignment)
fastaref = rnaseq.SavedFASTA('/path/to/referencegenome.fasta')
indexedref = rnaseq.FilePattern('/path/to/indexprefix')
StarIndex.Assets(StarIndex.Assets.Source(fastaref),
StarIndex.Assets.Target(indexedref))
The results are somewhat verbose, but if an IDE or an advanced interactive shell such as
ipython is used, autocompletion will make writing such statements relatively painless, and mostly intuitive.
One can just start from the step considered (StarIndex
in the example above) and everything can be derived
from it.
Unspecified assets¶
In a traditional approach, for example a sequence of commands in bash, the name of files must be specified at each step.
We are proposing an alternative with which a full recipe can be written without having to take care of file names for derived data (which can be a relative burden when considering to process the same data in many alternative ways). Only the original data files such as the reference genome, or the experimental data from the samples and sequencing, are specified and the other file names will be generated automatically.
Objects inheriting from AssetSet
are expected to have a method AssetSet.createundefined()
that
creates an undefined set of assets, and this can be used in recipes (see example below).
Foo = core.assetfactory('Foo', [core.AssetAttr('bar', core.File, '')])
# create an undefined set of assets of type Foo
undefoo = Foo.createundefined()
Notes about the design¶
Data analysis is often a REPL activity, and we are keeping this in mind. The writing of a recipes tries to provide:
- autocompletion-based discovery. Documentation is rarely read back-to-back (congratulations for making it this far though), and dynamic scientists often proceed by trial-and-error with software. The package is trying to provide benevolent support when doing so.
- fail early whenever possible (that is before long computations have already been performed)
- allow the writing of a full sequence of steps and run them unattended (tasks to be performed are stored, and executed when the user wants to)
Setup¶
# -- initialization boiler plate code
wd = tempfile.mkdtemp()
project = easy.Project(rnaseq, wd=wd)
# declare the 3rd-party command-line tools we will use
env = easy.Environment(rnaseq)
The package is also able to generate a small dataset based on a phage:
# Phage genome shipped with the package for testing purposes
PHAGEFASTA = railroadtracks.model.simulate.PHAGEFASTA
PHAGEGFF = railroadtracks.model.simulate.PHAGEGFF
# create random data for 6 samples (just testing here)
nsamples = 6
samplereads = list()
with open(PHAGEFASTA) as fasta_fh:
reference = next(railroadtracks.model.simulate.readfasta_iter(fasta_fh))
for sample_i in range(nsamples):
read1_fh = tempfile.NamedTemporaryFile(prefix='read1', suffix='.fq')
read2_fh = tempfile.NamedTemporaryFile(prefix='read2', suffix='.fq')
read1_fh, read2_fh = railroadtracks.model.simulate.randomPEreads(read1_fh,
read2_fh,
reference)
samplereads.append((read1_fh, read2_fh))
sampleinfo_fh = tempfile.NamedTemporaryFile(suffix='.csv', mode='w+')
csv_w = csv.writer(sampleinfo_fh)
csv_w.writerow(['sample_id', 'group'])
for i in range(6):
csv_w.writerow([str(i), ('A','B')[i%2]])
sampleinfo_fh.flush()
referenceannotation = rnaseq.GFFFile(PHAGEGFF)
Simple recipe¶
# steps used
bowtie2index = env.activities.INDEX.bowtie2build
bowtie2align = env.activities.ALIGN.bowtie2
htseqcount = env.activities.QUANTIFY.htseqcount
merge = env.activities.UTILITY.columnmerger
edger = env.activities.DIFFEXP.edger
from railroadtracks import easy
# sequence of tasks to run
torun = list()
# index for alignment
Assets = bowtie2index.Assets
assets = Assets(Assets.Source(rnaseq.FASTAFile(reference_fn)),
Assets.Target.createundefined())
task_index = project.add_task(bowtie2index, assets)
torun.append(task_index)
# process all samples
sample_counts = list()
for read1_fh, read2_fh in samplereads:
# align
Assets = bowtie2align.Assets
assets = Assets(Assets.Source(task_index.call.assets.target.indexfilepattern,
rnaseq.FASTQPossiblyGzipCompressed(read1_fh.name),
rnaseq.FASTQPossiblyGzipCompressed(read2_fh.name)),
Assets.Target.createundefined())
task_align = project.add_task(bowtie2align, assets)
torun.append(task_align)
# quantify
# (non-default parameters to fit our demo GFF)
params = rnaseq.HTSeqCount._noexons_parameters
Assets = htseqcount.Assets
assets = Assets(Assets.Source(task_align.call.assets.target.alignment,
rnaseq.GFFFile(referenceannotation)),
Assets.Target.createundefined())
task_quantify = project.add_task(htseqcount,
assets,
parameters=params)
torun.append(task_quantify)
# keep a pointer to the counts,
# as we will use them in the merge step
sample_counts.append(task_quantify.call.assets)
# merge the sample data into a table
# (so differential expression can be computed)
Assets = merge.Assets
counts = tuple(x.target.counts for x in sample_counts)
assets = Assets(Assets.Source(rnaseq.CSVFileSequence(counts)),
merge.Assets.Target.createundefined())
task_merge = project.add_task(merge,
assets,
parameters=("0","1"))
torun.append(task_merge)
# differential expression with edgeR
Assets = edger.Assets
assets = Assets(Assets.Source(task_merge.call.assets.target.counts,
rnaseq.CSVFile(sampleinfo_fh.name)),
Assets.Target.createundefined())
task_de = project.add_task(edger,
assets)
# run the tasks
for task in torun:
# run only if not done
if task.info[1] != hortator._TASK_DONE:
task.execute()
# get results
final_storedentities = project.get_targetsofactivity(rnaseq.ACTIVITY.DIFFEXP)
# get the step that created the results files
final_steps = list()
for stored_entity in final_storedentities:
final_steps.append(project.persistent_graph.get_parenttask_of_storedentity(stored_entity))
The variable wd contains the directory with all intermediate data and the final results, and db_dn is the database file.
Note
If you clean up after yourself, but want to run the next recipe in this documentation, the setup step will have to be run again.
Loops, nested loops, and many variants¶
from railroadtracks import easy
torun = list()
# bowtie
bowtie1index = env.activities.INDEX.bowtiebuild
bowtie1align = env.activities.ALIGN.bowtie
Assets = bowtie1index.Assets
fa_file = rnaseq.FASTAFile(reference_fn)
task_index_bowtie1 = project.add_task(bowtie1index,
Assets(Assets.Source(fa_file),
None))
torun.append(task_index_bowtie1)
# bowtie2
bowtie2index = env.activities.INDEX.bowtie2build
bowtie2align = env.activities.ALIGN.bowtie2
Assets = bowtie2index.Assets
fa_file = rnaseq.FASTAFile(reference_fn)
task_index_bowtie2 = project.add_task(bowtie2index,
Assets(Assets.Source(fa_file),
None))
torun.append(task_index_bowtie2)
# STAR
starindex = env.activities.INDEX.starindex
staralign = env.activities.ALIGN.staralign
Assets = starindex.Assets
fa_file = rnaseq.FASTAFile(reference_fn)
task_index_star = project.add_task(starindex,
Assets(Assets.Source(fa_file),
None))
torun.append(task_index_star)
# TopHat2
# (index from bowtie2 used)
#tophat2 = env.activities.ALIGN.tophat2
# featureCount
featurecount = env.activities.QUANTIFY.featurecount
# Merge columns (obtained from counting)
merge = env.activities.UTILITY.columnmerger
# EdgeR, DESeq, DESeq2, and LIMMA voom
edger = env.activities.DIFFEXP.edger
deseq = env.activities.DIFFEXP.deseq
deseq2 = env.activities.DIFFEXP.deseq2
voom = env.activities.DIFFEXP.limmavoom
# Now explore the different alignment presets in bowtie2, and vanilla star
from itertools import cycle
from collections import namedtuple
Options = namedtuple('Options', 'aligner assets_index parameters')
# Try various presets for bowtie2
bowtie2_parameters = (('--very-fast', ), ('--fast', ),
('--sensitive', ), ('--very-sensitive', ))
options = [Options(*x) for x in zip(cycle((bowtie2align,)),
cycle((task_index_bowtie2.call.assets.target,)),
bowtie2_parameters)]
# add bowtie
options.append(Options(bowtie1align, task_index_bowtie1.call.assets.target, tuple()))
# add STAR (vanilla, no specific options beside the size of index k-mers)
options.append(Options(staralign,
task_index_star.call.assets.target,
('--genomeChrBinNbits', '12')))
# add TopHat2
#options.append(Options(tophat2, task_index_bowtie2.call.assets.target, tuple()))
# loop over the options
for option in options:
sample_counts = list()
# loop over the samples
for sample_i in range(nsamples):
read1_fh, read2_fh = samplereads[sample_i]
# align
Assets = option.aligner.Assets
assets = Assets(Assets.Source(option.assets_index.indexfilepattern,
rnaseq.FASTQPossiblyGzipCompressed(read1_fh.name),
rnaseq.FASTQPossiblyGzipCompressed(read2_fh.name)),
Assets.Target.createundefined())
task_align = project.add_task(option.aligner,
assets,
parameters=option.parameters)
torun.append(task_align)
# quantify
# (non-default parameters to fit our demo GFF)
Assets = featurecount.Assets
assets = Assets(Assets.Source(task_align.call.assets.target.alignment,
rnaseq.GFFFile(referenceannotation)),
Assets.Target.createundefined())
task_quantify = project.add_task(featurecount,
assets,
parameters = ('--gtf-featuretype', 'CDS',
'--gtf-attrtype', 'ID'))
torun.append(task_quantify)
# keep a pointer to the counts, as we will use it in the merge step
sample_counts.append(task_quantify.call.assets)
# merge the sample data into a table (so differential expression can be computed)
Assets = merge.Assets
source = Assets.Source(rnaseq.CSVFileSequence(tuple(x.target.counts\
for x in sample_counts)))
assets_merge = Assets(source,
Assets.Target.createundefined())
task_merge = project.add_task(merge,
assets_merge,
parameters=("0","1"))
torun.append(task_merge)
# differential expression with edgeR, deseq2, and voom
# (deseq is too whimsical for tests)
for diffexp, params in ((edger, ()),
(deseq, ('--dispersion-fittype=local', )),
(deseq2, ()),
(voom, ())):
Assets = diffexp.Assets
assets = Assets(Assets.Source(task_merge.call.assets.target.counts,
core.File(sampleinfo_fh.name)),
Assets.Target.createundefined())
task_de = project.add_task(diffexp,assets)
torun.append(task_de)
# run the tasks
# (this is an integration test rather than a unit test - the
# 3rd-party tools are often brittle and we want to keep the noise level down)
env_log_level = environment.logger.level
environment.logger.level = logging.ERROR
try:
for task in torun:
if task.info[1] != hortator._TASK_DONE:
try:
task.execute()
status = easy.hortator._TASK_DONE
except:
status = easy.hortator._TASK_FAILED
project.persistent_graph.step_concrete_state(hortator.DbID(task.task_id, False),
easy.hortator._TASK_STATUS_LIST[status])
finally:
environment.logger.level = env_log_level
Docstrings¶
Package aiming at making common operations “easy”.
-
class
railroadtracks.easy.
ActivityCount
(count, name, status)¶ -
count
¶ Alias for field number 0
-
name
¶ Alias for field number 1
-
status
¶ Alias for field number 2
-
-
class
railroadtracks.easy.
Asset
(project, savedentity, asset_id)[source]¶ An asset is either used by a task (then it is a source asset) or is produced by a task (then it is a target asset).
-
entity
¶ Instance of
SavedEntityAbstract
(or rather a child class thereof).
-
-
class
railroadtracks.easy.
Environment
(model)[source]¶ Represent the current environment in a (presumably) easy way for writing recipes.
-
activities
¶ Access the activities declared by the model as attributes.
-
stepclasses
¶ Steps.
-
stepinstances
¶ Default instance for the steps (created from the default executables in the PATH).
-
-
class
railroadtracks.easy.
Project
(model, wd='railroadtracks_project', db_fn=None, force_create=False, isolation_level=None, cached=False)[source]¶ A project, that is a directory containing data as well as a persistent storage of steps and how derived data and final results were obtained.
-
__init__
(model, wd='railroadtracks_project', db_fn=None, force_create=False, isolation_level=None, cached=False)[source]¶ Parameters: - model – module with the definition of the model (that is the classes defining the type of tasks computed)
- wd (
str
) – Name of a working directory (where all intermediate and final results will be saved. - db_fn (
str
or None) – Name of a file name for the database. If None, use the file “railroadtracks.db” in the directory specified in “wd”. - isolation_level – passed to the constructor for
PersistentTaskGraph
Return type: a tuple with (
hortator.StepGraph
, working directory as astr
, file name for the database and astr
)
-
add_task
(step, assets, parameters=(), tag=1)[source]¶ Add a task to the project. If any of the assets’ targets is not defined, it will be defined automatically.
Parameters: - step –
- assets –
- parameters –
- tag – a tag (to differentiate repetitions of the exact same task)
-
cache
¶ Cache for the dependency graph. This may not reflect changes made by an independent access to the project or a direct access to the persistent graph (see
persistent_graph
) but will be much faster and the preferred way to access the graph when there is only one concurrent access.
-
db_fn
¶ Path to the database file
-
get_targetsofactivity
(activity)[source]¶ Retrieve the targets of steps performing a specific activity. (calls the method of the same name in the contained
PersistentTaskGraph
) :param activity: an activity :type activity:Enum
-
get_targetsoftype
(obj)[source]¶ Retrieve the targets having a given type. (calls the method of the same name in the contained
PersistentTaskGraph
) :param obj: a type, an instance, or a type name :type obj:type
,Object
, orstr
-
get_tasksoftype
(taskorstep)[source]¶ Retrieve tasks matching the type of “taskorstep”.
The parameter ‘taskorstep’ can be either: - a class inheriting from StepAbstract - an instance of Task - an instance of StepAbstract In the two last cases, the path to the executable and the version number are used to filter the tasks returned.
-
iter_srcassets
(task)[source]¶ Return the source files for a given task (calls the method of the same name in the contained
PersistentTaskGraph
) :param task: aTask
.
-
iter_targetassets
(task)[source]¶ Return the target files for a given task (calls the method of the same name in the contained
PersistentTaskGraph
) :param task: aTask
.
-
model
¶ Model used in the project.
-
newproject
¶ Tell whether this is a new project, rather than a existing project (re)opened.
-
persistent_graph
¶ Access to the persistent graph. This is will be slower than access the cached version, but will always be accurate.
-
wd
¶ Working directory. The directory in which derived data is stored.
-
Troubleshooting¶
The standard Python module logging
is used for logging, and its documentation should be checked.
For example, a very simple way to activate logging at the level DEBUG on stdout:
import railroadtracks
logger = railroadtracks.logger
logger.setLevel(railroadtracks.logging.DEBUG)
railroadtracks.basicConfig('/tmp/stdout')