Recipes

The model, together with the persistence layer, is designed to make the writing of sequences of steps for RNA-Seq data processing rather simple and and make changing to an alternative step (e.g., aligner, differential expression method, etc...) trivial. This is also designed to make coexisting variants something a user does not have to worry about (unless inclined to - the system is open).

Note

The general principles to remember are limited to:

  • Steps require assets to run (and optionally parameters)
  • Assets are constituted of two groups: source and target elements
  • Parameters are optionally given

The bundle of source and target assets, parameters, and a step represents a task. Explanations about step and assets follow.

>>> source = SomeStep.Assets.Source( inputentity )
>>> target = SomeStep.Assets.Target( outputentity )
>>> assets = SomeStep.Assets(source, target)
>>> step = SomeStep( pathtoexecutable )
>>> step.run( assets )

Steps and assets

Concept

All steps in the process are connected through intermediate data files which we call assets. Bioinformatics tools are almost always designed to operate on files, with the occasional pipes being used.

A step can be represented as the step itself with input files (the source assets) and output files (the target assets).

digraph ExampleAssets {
  "src1"->"step";
  "src1"[label="Some data", shape=invhouse, style=filled, fillcolor=lightgrey];
  "src2"->"step";
  "src2"[label="Other data", shape=invhouse, style=filled, fillcolor=lightgrey];
  "step"[label="Step"];
  "step"->"tgt1";
  "tgt1"[label="Product", shape=box];
}

This group of nodes (here 4 nodes: 2 source, 1 step, 1 target) can also be collapsed as a step, and the representation of a workflow be the graph connecting this summary representation.

For example, the initial steps for aligner-based RNA-Seq can be represented as:

digraph HighLevel {
  subgraph cluster_3 {
    label="High-level";
    style=filled;
    color=lightgrey;
    node [style=filled,color=white];
    "Index"->"Align"->"Count";
  }
}

The Step-and-Assets graph will then look like this:

digraph StepAssets {
  subgraph cluster_0 {
    label="Index";
    color=lightgrey;
    "ReferenceGenome"->"Indexer";
    "ReferenceGenome"[label="Reference Genome", shape=invhouse, colorscheme=set36, style=filled, fillcolor=1];
    "Indexer"->"IndexedGenomeS";
    "Indexer"[colorscheme=set36, style=filled, fillcolor=2];
    "IndexedGenomeS"[label="Indexed Genome", shape=box, colorscheme=set36, style=filled, fillcolor=2, penwidth=2];
  }
  subgraph cluster_1 {
    label="Align";
    color=lightgrey;
    "IndexedGenomeT"->"Aligner";
    "IndexedGenomeT"[label="Indexed Genome", shape=box, colorscheme=set36, style=filled, fillcolor=2, penwidth=2];
    "Reads1" -> "Aligner";
    "Reads1"[label="Reads 1", shape=invhouse, colorscheme=set36, style=filled, fillcolor=5];
    "Reads2" -> "Aligner";
    "Reads2"[label="Reads 2", shape=invhouse, colorscheme=set36, style=filled, fillcolor=5];
    "Aligner" -> "AlignedReadsS";
    "Aligner"[colorscheme=set36, style=filled, fillcolor=2];
    "AlignedReadsS"[label="Aligned Reads", shape=box, colorscheme=set36, style=filled, fillcolor=2, penwidth=2];
  }
  "IndexedGenomeS"->"IndexedGenomeT"[style="dotted",arrowhead="none"];
  subgraph cluster_2 {
    label="Count";
    color=lightgrey;
    "AlignedReadsT"->"Counter";
    "AlignedReadsT"[label="Aligned Reads", shape=box, colorscheme=set36, style=filled, fillcolor=2, penwidth=2];
    "ReferenceAnnotation" -> "Counter";
    "ReferenceAnnotation"[label="Reference Annotation", shape=invhouse, colorscheme=set36, style=filled, fillcolor=5];
    "Counter" -> "DigitalGeneExpression";
    "Counter"[colorscheme=set36, style=filled, fillcolor=2];
    "DigitalGeneExpression"[label="Digital Gene Expression", shape=box, colorscheme=set36, style=filled, fillcolor=2, penwidth=2];
  }
  "AlignedReadsS"->"AlignedReadsT"[style="dotted",arrowhead="none"];
}

The nodes Indexed Genome and Aligned Reads are duplicated for clarity with the grouping of nodes, but it is the same entity saved on disk produced and used by the two steps.

When using the framework, the easiest way to think about it is to start from a step (child class of StepAbstract), and look for the attribute Assets. That attribute is a class representing the step’s assets, and is itself containing 2 class attributes Source and Target (themselves classes as well).

For example with the class modeling the aligner “STAR”:

import railroadtracks.rnaseq import StarIndex, StarAlign

# assets for the indexing of references (indexed for the alignment)
StarIndex.Assets
# assets for aligning reads against indexed references
StarAlign.Assets

The assets are divided into two sets, the Source and the Target, for the files the step is using and the files the step is producing respectively. Source and Target are themselves classes.

import railroadtracks.rnaseq import StarIndex, StarAlign

# assets for the indexing of references (indexed for the alignment)
fastaref = rnaseq.SavedFASTA('/path/to/referencegenome.fasta')
indexedref = rnaseq.FilePattern('/path/to/indexprefix')
StarIndex.Assets(StarIndex.Assets.Source(fastaref),
                 StarIndex.Assets.Target(indexedref))

The results are somewhat verbose, but if an IDE or an advanced interactive shell such as ipython is used, autocompletion will make writing such statements relatively painless, and mostly intuitive. One can just start from the step considered (StarIndex in the example above) and everything can be derived from it.

Unspecified assets

In a traditional approach, for example a sequence of commands in bash, the name of files must be specified at each step.

We are proposing an alternative with which a full recipe can be written without having to take care of file names for derived data (which can be a relative burden when considering to process the same data in many alternative ways). Only the original data files such as the reference genome, or the experimental data from the samples and sequencing, are specified and the other file names will be generated automatically.

Objects inheriting from AssetSet are expected to have a method AssetSet.createundefined() that creates an undefined set of assets, and this can be used in recipes (see example below).

        Foo = core.assetfactory('Foo', [core.AssetAttr('bar', core.File, '')])
        # create an undefined set of assets of type Foo
        undefoo = Foo.createundefined()

Notes about the design

Data analysis is often a REPL activity, and we are keeping this in mind. The writing of a recipes tries to provide:

  • autocompletion-based discovery. Documentation is rarely read back-to-back (congratulations for making it this far though), and dynamic scientists often proceed by trial-and-error with software. The package is trying to provide benevolent support when doing so.
  • fail early whenever possible (that is before long computations have already been performed)
  • allow the writing of a full sequence of steps and run them unattended (tasks to be performed are stored, and executed when the user wants to)

Setup

        # -- initialization boiler plate code
        wd = tempfile.mkdtemp()
        project = easy.Project(rnaseq, wd=wd)

        # declare the 3rd-party command-line tools we will use
        env = easy.Environment(rnaseq)

The package is also able to generate a small dataset based on a phage:

        # Phage genome shipped with the package for testing purposes

        PHAGEFASTA = railroadtracks.model.simulate.PHAGEFASTA
        PHAGEGFF = railroadtracks.model.simulate.PHAGEGFF

        # create random data for 6 samples (just testing here)
        nsamples = 6
        samplereads = list()
        with open(PHAGEFASTA) as fasta_fh:
            reference = next(railroadtracks.model.simulate.readfasta_iter(fasta_fh))
        for sample_i in range(nsamples):
            read1_fh = tempfile.NamedTemporaryFile(prefix='read1', suffix='.fq')
            read2_fh = tempfile.NamedTemporaryFile(prefix='read2', suffix='.fq')
            read1_fh, read2_fh = railroadtracks.model.simulate.randomPEreads(read1_fh,
                                                                             read2_fh, 
                                                                             reference)
            samplereads.append((read1_fh, read2_fh))

        sampleinfo_fh = tempfile.NamedTemporaryFile(suffix='.csv', mode='w+')
        csv_w = csv.writer(sampleinfo_fh)
        csv_w.writerow(['sample_id', 'group'])
        for i in range(6):
            csv_w.writerow([str(i), ('A','B')[i%2]])
        sampleinfo_fh.flush()
        referenceannotation = rnaseq.GFFFile(PHAGEGFF)

Simple recipe

        # steps used
        bowtie2index = env.activities.INDEX.bowtie2build
        bowtie2align = env.activities.ALIGN.bowtie2
        htseqcount = env.activities.QUANTIFY.htseqcount
        merge = env.activities.UTILITY.columnmerger
        edger = env.activities.DIFFEXP.edger

        from railroadtracks import easy

        # sequence of tasks to run
        torun = list()
                            
        # index for alignment
        Assets = bowtie2index.Assets
        assets = Assets(Assets.Source(rnaseq.FASTAFile(reference_fn)),
                        Assets.Target.createundefined())
        task_index = project.add_task(bowtie2index, assets)
        torun.append(task_index)

        # process all samples
        sample_counts = list()
        for read1_fh, read2_fh in samplereads:
            # align
            Assets = bowtie2align.Assets
            assets = Assets(Assets.Source(task_index.call.assets.target.indexfilepattern, 
                                          rnaseq.FASTQPossiblyGzipCompressed(read1_fh.name),
                                          rnaseq.FASTQPossiblyGzipCompressed(read2_fh.name)),
                            Assets.Target.createundefined())
            task_align = project.add_task(bowtie2align, assets)
            torun.append(task_align)

            # quantify
            # (non-default parameters to fit our demo GFF)
            params = rnaseq.HTSeqCount._noexons_parameters
            Assets = htseqcount.Assets
            assets = Assets(Assets.Source(task_align.call.assets.target.alignment,
                                          rnaseq.GFFFile(referenceannotation)),
                            Assets.Target.createundefined())
            task_quantify = project.add_task(htseqcount,
                                             assets,
                                             parameters=params)
            torun.append(task_quantify)
            # keep a pointer to the counts,
            # as we will use them in the merge step
            sample_counts.append(task_quantify.call.assets)

        # merge the sample data into a table
        # (so differential expression can be computed)
        Assets = merge.Assets
        counts = tuple(x.target.counts for x in sample_counts)
        assets = Assets(Assets.Source(rnaseq.CSVFileSequence(counts)),
                        merge.Assets.Target.createundefined())
        task_merge = project.add_task(merge,
                                      assets,
                                      parameters=("0","1"))
        torun.append(task_merge)

        # differential expression with edgeR
        Assets = edger.Assets
        assets = Assets(Assets.Source(task_merge.call.assets.target.counts,
                                      rnaseq.CSVFile(sampleinfo_fh.name)),
                        Assets.Target.createundefined())
        task_de = project.add_task(edger,
                                   assets)

        # run the tasks
        for task in torun:
            # run only if not done
            if task.info[1] != hortator._TASK_DONE:
                task.execute()

        # get results
        final_storedentities = project.get_targetsofactivity(rnaseq.ACTIVITY.DIFFEXP)

        # get the step that created the results files
        final_steps = list()
        for stored_entity in final_storedentities:
            final_steps.append(project.persistent_graph.get_parenttask_of_storedentity(stored_entity))
        

The variable wd contains the directory with all intermediate data and the final results, and db_dn is the database file.

Note

If you clean up after yourself, but want to run the next recipe in this documentation, the setup step will have to be run again.

Loops, nested loops, and many variants

        from railroadtracks import easy

        torun = list()

        # bowtie
        bowtie1index = env.activities.INDEX.bowtiebuild
        bowtie1align = env.activities.ALIGN.bowtie
        Assets = bowtie1index.Assets
        fa_file = rnaseq.FASTAFile(reference_fn)
        task_index_bowtie1 = project.add_task(bowtie1index, 
                                              Assets(Assets.Source(fa_file),
                                                     None))
        torun.append(task_index_bowtie1)

        # bowtie2
        bowtie2index = env.activities.INDEX.bowtie2build
        bowtie2align = env.activities.ALIGN.bowtie2
        Assets = bowtie2index.Assets
        fa_file = rnaseq.FASTAFile(reference_fn)
        task_index_bowtie2 = project.add_task(bowtie2index,
                                              Assets(Assets.Source(fa_file),
                                                     None))
        torun.append(task_index_bowtie2)

        # STAR
        starindex = env.activities.INDEX.starindex
        staralign = env.activities.ALIGN.staralign
        Assets = starindex.Assets
        fa_file = rnaseq.FASTAFile(reference_fn)
        task_index_star = project.add_task(starindex, 
                                           Assets(Assets.Source(fa_file),
                                                  None))
        torun.append(task_index_star)

        # TopHat2
        # (index from bowtie2 used)
        #tophat2 = env.activities.ALIGN.tophat2

        # featureCount
        featurecount = env.activities.QUANTIFY.featurecount

        # Merge columns (obtained from counting)
        merge = env.activities.UTILITY.columnmerger

        # EdgeR, DESeq, DESeq2, and LIMMA voom
        edger = env.activities.DIFFEXP.edger
        deseq = env.activities.DIFFEXP.deseq
        deseq2 = env.activities.DIFFEXP.deseq2
        voom = env.activities.DIFFEXP.limmavoom
        

        # Now explore the different alignment presets in bowtie2, and vanilla star
        from itertools import cycle
        from collections import namedtuple
        Options = namedtuple('Options', 'aligner assets_index parameters')
        # Try various presets for bowtie2
        bowtie2_parameters = (('--very-fast', ), ('--fast', ), 
                              ('--sensitive', ), ('--very-sensitive', ))
        options = [Options(*x) for x in zip(cycle((bowtie2align,)),
                                            cycle((task_index_bowtie2.call.assets.target,)),
                                            bowtie2_parameters)]

        # add bowtie
        options.append(Options(bowtie1align, task_index_bowtie1.call.assets.target, tuple()))
        # add STAR (vanilla, no specific options beside the size of index k-mers)
        options.append(Options(staralign, 
                               task_index_star.call.assets.target, 
                               ('--genomeChrBinNbits', '12')))
        # add TopHat2
        #options.append(Options(tophat2, task_index_bowtie2.call.assets.target, tuple()))

        # loop over the options
        for option in options:
            sample_counts = list()
            # loop over the samples
            for sample_i in range(nsamples):
                read1_fh, read2_fh = samplereads[sample_i]
                # align
                Assets = option.aligner.Assets
                assets = Assets(Assets.Source(option.assets_index.indexfilepattern,
                                              rnaseq.FASTQPossiblyGzipCompressed(read1_fh.name), 
                                              rnaseq.FASTQPossiblyGzipCompressed(read2_fh.name)),
                                Assets.Target.createundefined())
                task_align = project.add_task(option.aligner,
                                              assets,
                                              parameters=option.parameters)
                torun.append(task_align)

                # quantify
                # (non-default parameters to fit our demo GFF)
                Assets = featurecount.Assets
                assets = Assets(Assets.Source(task_align.call.assets.target.alignment,
                                              rnaseq.GFFFile(referenceannotation)),
                                Assets.Target.createundefined())
                task_quantify = project.add_task(featurecount,
                                                 assets,
                                                 parameters = ('--gtf-featuretype', 'CDS',
                                                               '--gtf-attrtype', 'ID'))
                torun.append(task_quantify)

                # keep a pointer to the counts, as we will use it in the merge step
                sample_counts.append(task_quantify.call.assets)

            # merge the sample data into a table (so differential expression can be computed)
            Assets = merge.Assets
            source = Assets.Source(rnaseq.CSVFileSequence(tuple(x.target.counts\
                                                                for x in sample_counts)))
            assets_merge = Assets(source,
                                  Assets.Target.createundefined())
            task_merge = project.add_task(merge,
                                          assets_merge,
                                          parameters=("0","1"))
            torun.append(task_merge)

            # differential expression with edgeR, deseq2, and voom
            # (deseq is too whimsical for tests)
            for diffexp, params in ((edger, ()),
                                    (deseq, ('--dispersion-fittype=local', )), 
                                    (deseq2, ()),
                                    (voom, ())):
                Assets = diffexp.Assets
                assets = Assets(Assets.Source(task_merge.call.assets.target.counts,
                                              core.File(sampleinfo_fh.name)),
                                Assets.Target.createundefined())
                task_de = project.add_task(diffexp,assets)
                torun.append(task_de)

        # run the tasks
        # (this is an integration test rather than a unit test - the 
        # 3rd-party tools are often brittle and we want to keep the noise level down)
        env_log_level = environment.logger.level
        environment.logger.level = logging.ERROR
        try:
            for task in torun:
                if task.info[1] != hortator._TASK_DONE:
                    try:
                        task.execute()
                        status = easy.hortator._TASK_DONE
                    except:
                        status = easy.hortator._TASK_FAILED
                project.persistent_graph.step_concrete_state(hortator.DbID(task.task_id, False),
                                                             easy.hortator._TASK_STATUS_LIST[status])
        finally:
            environment.logger.level = env_log_level

Docstrings

Package aiming at making common operations “easy”.

class railroadtracks.easy.ActivityCount(count, name, status)
count

Alias for field number 0

name

Alias for field number 1

status

Alias for field number 2

class railroadtracks.easy.Asset(project, savedentity, asset_id)[source]

An asset is either used by a task (then it is a source asset) or is produced by a task (then it is a target asset).

entity

Instance of SavedEntityAbstract (or rather a child class thereof).

class railroadtracks.easy.Environment(model)[source]

Represent the current environment in a (presumably) easy way for writing recipes.

__init__(model)[source]
Parameters:model – Python module following the railroadtracks model.
activities

Access the activities declared by the model as attributes.

stepclasses

Steps.

stepinstances

Default instance for the steps (created from the default executables in the PATH).

class railroadtracks.easy.Project(model, wd='railroadtracks_project', db_fn=None, force_create=False, isolation_level=None, cached=False)[source]

A project, that is a directory containing data as well as a persistent storage of steps and how derived data and final results were obtained.

__init__(model, wd='railroadtracks_project', db_fn=None, force_create=False, isolation_level=None, cached=False)[source]
Parameters:
  • model – module with the definition of the model (that is the classes defining the type of tasks computed)
  • wd (str) – Name of a working directory (where all intermediate and final results will be saved.
  • db_fn (str or None) – Name of a file name for the database. If None, use the file “railroadtracks.db” in the directory specified in “wd”.
  • isolation_level – passed to the constructor for PersistentTaskGraph
Return type:

a tuple with (hortator.StepGraph, working directory as a str, file name for the database and a str)

add_task(step, assets, parameters=(), tag=1)[source]

Add a task to the project. If any of the assets’ targets is not defined, it will be defined automatically.

Parameters:
  • step
  • assets
  • parameters
  • tag – a tag (to differentiate repetitions of the exact same task)
cache

Cache for the dependency graph. This may not reflect changes made by an independent access to the project or a direct access to the persistent graph (see persistent_graph) but will be much faster and the preferred way to access the graph when there is only one concurrent access.

db_fn

Path to the database file

get_targetsofactivity(activity)[source]

Retrieve the targets of steps performing a specific activity. (calls the method of the same name in the contained PersistentTaskGraph) :param activity: an activity :type activity: Enum

get_targetsoftype(obj)[source]

Retrieve the targets having a given type. (calls the method of the same name in the contained PersistentTaskGraph) :param obj: a type, an instance, or a type name :type obj: type, Object, or str

get_task(task_id)[source]

Given a task ID, retrieve the associated task.

get_tasksoftype(taskorstep)[source]

Retrieve tasks matching the type of “taskorstep”.

The parameter ‘taskorstep’ can be either: - a class inheriting from StepAbstract - an instance of Task - an instance of StepAbstract In the two last cases, the path to the executable and the version number are used to filter the tasks returned.

get_taskswithactivity(activity)[source]

Retrieve tasks matching the given activity.

iter_srcassets(task)[source]

Return the source files for a given task (calls the method of the same name in the contained PersistentTaskGraph) :param task: a Task.

iter_targetassets(task)[source]

Return the target files for a given task (calls the method of the same name in the contained PersistentTaskGraph) :param task: a Task.

model

Model used in the project.

newproject

Tell whether this is a new project, rather than a existing project (re)opened.

persistent_graph

Access to the persistent graph. This is will be slower than access the cached version, but will always be accurate.

wd

Working directory. The directory in which derived data is stored.

railroadtracks.easy.call_factory(project, step_concrete_id, stepobj, assets, parameters=())[source]
Parameters:
  • stepobj (instance of StepAbstract) – Step object
  • assets (instance of AssetsStep) – assets
Return type:

function

Troubleshooting

The standard Python module logging is used for logging, and its documentation should be checked. For example, a very simple way to activate logging at the level DEBUG on stdout:

import railroadtracks
logger = railroadtracks.logger
logger.setLevel(railroadtracks.logging.DEBUG)
railroadtracks.basicConfig('/tmp/stdout')