Creating a project¶
Railroadtracks is a toolkit designed to help the handling of computational tasks when: - the tasks have a relatively high rate of failure because running the underlying software is brittle (resources such as RAM or time not known beforehand, spurious crash) - the tasks can be iteratively added (the results of tasks can decide what is next) - the programmatic construction of pipelines can be desirable
This ipython notebook is a self-contained tutorial to show some its features.
With railroadtracks, projects are typically contained in a directory on the file system: this will contain a database describing the dependency graph as well as the results for all tasks.
We will use the following directory for this tutorial (it can be changed to a different location if needed):
tmpdir = '/tmp/rrt_tutorial' # this directory will be erased !
The ”!” will run a shell command. With this we make sure that we can start cleanly.
! rm -rf $tmpdir
In addition to the directory, a project will require a model module.
This is a python module in which classes to represent the tasks are
declared or imported from other module. Here we use the model for
RNA-Seq included with railroadtracks
.
# import the model for RNA-Seq
from railroadtracks import rnaseq
The package is designed with interactive (REPL) programming in mind. Error messages should be relatively explicit, and lead the user to a quick resolution of problems.
For example, trying to create a project now will end with an error.
import os
from railroadtracks import easy
model = rnaseq
# try/except to allow a fully automated evaluation of the notebook
try:
project = easy.Project(model,
wd = os.path.join(tmpdir, 'project'))
except Exception as e:
print('Error: %s' % str(e))
Error: The working directory '/tmp/rrt_tutorial/project' should be a directory
The issue can be fixed by creating the project directory.
! mkdir -p $tmpdir/project
This time the creation of a project is working.
project = easy.Project(model,
wd = os.path.join(tmpdir, 'project'))
Displaying a project¶
Railroadtracks objects often have a custom string representation. For example, we can print our project:
print(project)
Status: New "railroadtracks" project
Working directory: /tmp/rrt_tutorial/project
Storage
Available: 31.00 GB
Total: 39.00 GB
Tasks
Total: 0
--
Activity Progress
--
Result Type Progress
HTML display is also available for some of the objects, and this can be used in the IPython notebook.
from IPython.display import display
import railroadtracks.ipython as rsip
rsip.init_printing()
Our project can now be “displayed” in the notebook.
display(project)
New "railroadtracks" project | |
---|---|
Working directory | |
/tmp/rrt_tutorial/project | |
Disk space | 4.00GB (total: 39.00GB) |
Tasks |
Activity | Progress |
---|
Result Type | Progress |
---|
Working with model environments¶
A model environment is a convenience class that exposes the content of a model (see the creation of a project above) in a form convenient for writing recipes (that is sequence of steps), with a special attention to interactive use (for example with ipython).
Creating a working environment from a model is meant to be as easy as creating a project.
Note
The attentive reader will note that an other model could be written and
used (either as a module like rnaseq
currently is, or as an object)
env = easy.Environment(rnaseq)
/usr/local/lib/python2.7/site-packages/railroadtracks/model/quantify.py:194: UserWarning: Model SailfishQuant is deprecated. Please use SalmonAlignQuant.
warnings.warn('Model SailfishQuant is deprecated. Please use SalmonAlignQuant.')
/usr/local/lib/python2.7/site-packages/railroadtracks/model/aligners.py:160: UserWarning: Model SailfishIndex is deprecated. Please use SalmonIndex.
warnings.warn('Model SailfishIndex is deprecated. Please use SalmonIndex.')
In this example, we will build - a bowtie index for a reference genome - align reads in FASTQ files using that index
The package is shipping with the public sequence of a phage, and will use this as a conveniently small toy genome.
import railroadtracks.model.simulate
reference_fn = railroadtracks.model.simulate.PHAGEFASTA
We also need an indexer (indexing part of an aligner).
The working environment we created contains an attribute activities that
acts a namespace of activiyies declared in the model. Writing
env.activities and hitting <tab>
on ipython would list the options
available.
Here we select bowtie2.
bowtie2index = env.activities.INDEX.bowtie2build
Aligning reads using bowtie2¶
The object bowtie2index
is a “step” that can perform the indexing of
reference genome. The embodiment of that step into a task, that is the
application of the step to a specific set of input file requires the
definition of the associated assets.
The assets are basically the input files (source assets) and the output files (target assets).
Each step contains a nested class Assets
, itself with a nested class
Source
and a nested class Target
. These should be used to
construct our assets.
try:
source = bowtie2index.Assets.Source(reference_fn)
except Exception as e:
print('Error: %s' % str(e))
Error: The object "'/usr/local/lib/python2.7/site-packages/railroadtracks/EF204940.fa'" was expected to be of type <class 'railroadtracks.model.files.FASTAFile'> (but is a <type 'str'>)
Again, error messages are meant to be rather explicit. Here the model is typing the input assets (which allows early consistency checks). The error disappears when the required class is used.
reference_fasta = rnaseq.FASTAFile(reference_fn)
source = bowtie2index.Assets.Source(reference_fasta)
display(source)
Name | Class | |
---|---|---|
reference | <class 'railroadtracks.model.files.FASTAFile'> | ("defined") |
When thinking about a task, we let one focus on the input for the task while deferring the definition of the output (where the files will be stored) to a later stage (and even have it handled for you).
Target assets can be created as “undefined”.
target = bowtie2index.Assets.Target.createundefined()
display(target)
Name | Class | |
---|---|---|
indexfilepattern | <class 'railroadtracks.model.aligners.SavedBowtie2Index'> | ("undefined") |
Building the complete set of assets is done by combining the source and target assets.
assets_index = bowtie2index.Assets(source, target)
Whenever the target assets are undefined, one can implicitly mean it by
using Python’s None
or omitting the target altogether:
assets_index = bowtie2index.Assets(source, target=None)
# or
assets_index = bowtie2index.Assets(source)
With both a step and assets, we can now create a task:
Note that the task is labelled as “To do”...
task = project.add_task(bowtie2index, assets_index)
display(task)
ID | 1 | |||||||
Info | (1, u'To do', 1434847392.997949) | |||||||
Step | Bowtie2Build | |||||||
Source |
|
|||||||
Target |
|
|||||||
Parameters | () |
task.info
(1, u'To do', 1434847392.997949)
task.task_id
1
Note that the target is now fully defined. This was performed automatically by the framework.
display(assets_index)
<railroadtracks.model.aligners.Assets at 0x7f31e411be50>
Thanks to the persistent layer, adding tasks can be done without having to explicitly keep track of the operations already performed. Identical tasks will be identified as such and a unique ID is assigned to them.
To demonstrate this, we add the same task again (with an undefined target). The result is the same ID (1), and the attribute new tells that this ID is not new.
assets_again = bowtie2index.Assets(bowtie2index.Assets.Source(rnaseq.FASTAFile(reference_fn)))
task = project.add_task(bowtie2index, assets_again)
display(task)
ID | 1 | |||||||
Info | (1, u'To do', 1434847392.997949) | |||||||
Step | Bowtie2Build | |||||||
Source |
|
|||||||
Target |
|
|||||||
Parameters | () |
Since our model provides a relatively unified interface for steps, building the indexes for several tools can be done easily in a loop:
listofindexers = (env.activities.INDEX.bwaindex,
env.activities.INDEX.bowtie2build)
# list of tasks
listoftasks = list()
for indexer in listofindexers:
# create assets for the indexer
assets_index = indexer.Assets(indexer.Assets.Source(reference_fasta))
# add to the list of tasks
listoftasks.append(project.add_task(indexer, assets_index))
Note that our little project is now growing:
display(project)
New "railroadtracks" project | |
---|---|
Working directory | |
/tmp/rrt_tutorial/project | |
Disk space | 4.00GB (total: 39.00GB) |
Tasks | |
To do | 2/2 |
Activity | Progress |
---|---|
Index | 0/2 |
Result Type | Progress |
---|---|
SavedBowtie2Index | 0/1 |
BWAIndexFiles | 0/1 |
Task execution¶
Executing tasks can be done manually with:
for task in listoftasks:
try:
task.execute()
# set the status to "done"
task.status = easy._TASK_DONE
except:
# set the status to "failed"
task.status = easy._TASK_FAILED
display(project)
New "railroadtracks" project | |
---|---|
Working directory | |
/tmp/rrt_tutorial/project | |
Disk space | 4.00GB (total: 39.00GB) |
Tasks | |
Done | 2/2 |
Activity | Progress |
---|---|
Index | 2/2 |
Result Type | Progress |
---|---|
SavedBowtie2Index | 1/1 |
BWAIndexFiles | 1/1 |
for task in listoftasks:
task.reset()
display(project)
New "railroadtracks" project | |
---|---|
Working directory | |
/tmp/rrt_tutorial/project | |
Disk space | 4.00GB (total: 39.00GB) |
Tasks | |
To do | 2/2 |
Activity | Progress |
---|---|
Index | 0/2 |
Result Type | Progress |
---|---|
SavedBowtie2Index | 0/1 |
BWAIndexFiles | 0/1 |
ts_index = easy.TaskSet()
for task in listoftasks:
ts_index.add(task)
However, railroadtracks is also providing a level of abstraction to execute sets of tasks, separating the declaration of tasks from where they will be running.
Task mappers are objects that implement a method map
that can take a
TaskSet
and run all the tasks it contains.
One such mapper provided with railroadtracks
is a naive mapper that
executes tasks iteratively and in the current process.
engine = easy.IterativeExecution()
engine.map(ts_index)
Counter({'Done': 2})
An other mapper provided is using multiple processes:
for task in ts_index:
task.reset()
# multiprocessing with 2 parallel processes
NPROCS = 2
engine = easy.MultiprocessingExecution(NPROCS)
engine.map(ts_index)
Counter({'Done': 2})
We are also providing a mapper for SGE’s qsub and have implemented an in-house one for an Hadoop cluster. The key point is that the description of the workflow is independent of the type of infrastructure the tasks will be executed on.
The graph in the dependency graph¶
Our dependency graph can be turned into a networkx
DiGraph
object.
from railroadtracks.easy.taskgraph import TaskGraph
taskgraph = TaskGraph(project)
We are also providing a utility to display the dependency graph.
display(taskgraph)
Now let’s add more tasks: the aligment of FASTQ files and the counts for the annotated features.
We assume that the file names for the sequencing data are contained in a table:
import pandas
dataf_fq = pandas.DataFrame({'sample_n': tuple('sample_%i' % i for i in range(1, 4)),
'read1_fn': tuple('sample_%i_1.fq' % i for i in range(1, 4)),
'read2_fn': tuple('sample_%i_2.fq' % i for i in range(1, 4))})
import os, tempfile
tmpdir = tempfile.mkdtemp()
with open(reference_fn) as fasta_fh:
reference = next(railroadtracks.model.simulate.readfasta_iter(fasta_fh))
for idx, row in dataf_fq.iterrows():
with open(os.path.join(tmpdir, row['read1_fn']), 'w') as read1_fh, \
open(os.path.join(tmpdir, row['read2_fn']), 'w') as read2_fh:
railroadtracks.model.simulate.randomPEreads(read1_fh, read2_fh, reference)
Adding aligment tasks is then as simple as looping through them:
task_index = project.add_task(bowtie2index,
bowtie2index.Assets(bowtie2index.Assets.Source(reference_fasta)))
bowtie2 = env.activities.ALIGN.bowtie2
Assets = bowtie2.Assets
FASTQ = rnaseq.FASTQPossiblyGzipCompressed
ts_align = easy.TaskSet()
for idx, row in dataf_fq.iterrows():
assets_align = Assets(Assets.Source(task_index.assets.target.indexfilepattern,
FASTQ(os.path.join(tmpdir, row['read1_fn'])),
FASTQ(os.path.join(tmpdir, row['read2_fn']))))
task = project.add_task(bowtie2, assets_align)
ts_align.add(task)
Adding the quantification tasks is equally easy:
htseqcount = env.activities.QUANTIFY.htseqcount
Assets = htseqcount.Assets
ts_quantify = easy.TaskSet()
for task_align in ts_align:
assets_quantify = Assets(Assets.Source(task_align.assets.target.alignment,
rnaseq.GFFFile(railroadtracks.model.simulate.PHAGEGFF)))
task_quantify = project.add_task(htseqcount, assets_quantify,
parameters = tuple(['--will-fail']))
ts_quantify.add(task_quantify)
columnmerger = env.activities.UTILITY.columnmerger
Assets = columnmerger.Assets
filestomerge = tuple(rnaseq.CSVFile(x.assets.target.counts.name) for x in ts_quantify)
assets = Assets(Assets.Source(rnaseq.CSVFileSequence(filestomerge)))
task_merge_into_table = project.add_task(columnmerger, assets, parameters=(0,1))
taskgraph = TaskGraph(project)
from IPython.display import SVG
display(SVG(rsip.svg_digraph(taskgraph.digraph, graph_size="8,6")))
Having declared tasks and grouped them in sets of tasks that can be parallelized is all that is needed:
from railroadtracks.easy.tasksetgraph import TaskSetGraph
engine = easy.IterativeExecution()
tsg = TaskSetGraph(project=project, defaultmapper=engine)
# add them in reverse order to show that the execution plan
# does not depend on the order they are entered.
tsg.add(ts_quantify)
tsg.add(ts_align)
# individual tasks can also be added
tsg.add(task_merge_into_table)
# execute all tasks, parallelized with task sets and using
# the default mapper (execution engine)
tsg.execute()
/usr/local/lib/python2.7/site-packages/railroadtracks/model/quantify.py:102: UserWarning: The source asset 'alignedreads' should ideally be sorted by read IDs. We are sorting the file; use explicitly a 'BAMFileSortedByID' rather than a 'BAMFile' for better performances, as well as for reproducibility issues (the sorting will use whatever 'samtools` is first found in the PATH)
% ("alignedreads", BAMFileSortedByID.__name__, BAMFile.__name__))
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_8/tmpfGPiNi.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_6/tmpLiO51b.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_7/tmpxgWvY_.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
Unfortunately, while the alignment tasks were successful the quantification tasks were not:
display(project)
New "railroadtracks" project | |
---|---|
Working directory | |
/tmp/rrt_tutorial/project | |
Disk space | 4.00GB (total: 39.00GB) |
Tasks | |
Failed | 3/9 |
Done | 5/9 |
To do | 1/9 |
Activity | Progress |
---|---|
Index | 2/2 |
Align | 3/3 |
Quantify | 0/3 |
Utility | 0/1 |
Result Type | Progress |
---|---|
BWAIndexFiles | 1/1 |
CSVFile | 0/4 |
We can visualize this on a graph:
taskgraph = TaskGraph(project)
display(SVG(rsip.svg_digraph(taskgraph.digraph,
graph_size="8,6")))
Investigating the source of error is straightforward. Here the all quantification have failed, so we just pick one in the set and try to execute it.
task = next(iter(ts_quantify))
# we could also look at the task number (ID) on the graph and do:
# taskid = 10
# task = project.get_task(task_id)
try:
task.execute()
except Exception as e:
print(e)
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_6/tmprIM9MK.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
Command '['samtools', 'view', '/tmp/rrt_tutorial/project/step_6/tmprIM9MK.bam', '|', 'htseq-count', '--will-fail', '-', '/usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff']' returned non-zero exit status 2
The cause of the failure is an invalid parameter, which we correct in a new task set.
htseqcount = env.activities.QUANTIFY.htseqcount
Assets = htseqcount.Assets
ts_quantify = easy.TaskSet()
for task_align in ts_align:
assets_quantify = Assets(Assets.Source(task_align.assets.target.alignment,
rnaseq.GFFFile(railroadtracks.model.simulate.PHAGEGFF)))
task_quantify = project.add_task(htseqcount, assets_quantify,
parameters = htseqcount._noexons_parameters)
ts_quantify.add(task_quantify)
Assets = columnmerger.Assets
filestomerge = tuple(rnaseq.CSVFile(x.assets.target.counts.name) for x in ts_quantify)
assets = Assets(Assets.Source(rnaseq.CSVFileSequence(filestomerge)))
task_merge_into_table = project.add_task(columnmerger, assets, parameters=(0,1))
The task set is then added to our task set graph.
tsg.add(ts_quantify)
tsg.add(task_merge_into_table)
tsg.execute()
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_8/tmpHIGed3.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_6/tmpkj1guf.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
ERROR:railroadtracks.model.quantify:samtools view /tmp/rrt_tutorial/project/step_7/tmpY4aGoE.bam | htseq-count --will-fail - /usr/local/lib/python2.7/site-packages/railroadtracks/ef204940.gff
The new parameters allow us to succesfully run all tasks and obtain a table of counts.
This time we are using the additional grouping of tasks in the
TaskSetGraph
tsg when displaying the dependency graph:
taskgraph = TaskGraph(project)
display(SVG(rsip.svg_tasksetgraph_view(tsg, taskgraph, graph_size="8,6")))
More task can be added, such as for example extracting unaligned reads from the BAM files and producing FASTQ files containing them.
SamtoolsExtractUnaligned = railroadtracks.model.aligners.SamtoolsExtractUnaligned
taskset_unaligned = easy.TaskSet()
bamtofastq = railroadtracks.model.files.BedtoolsBamToFastqPE()
taskset_backtofq = easy.TaskSet()
for task_align in ts_align:
# extract unaligned reads
Assets = SamtoolsExtractUnaligned.Assets
assets = Assets(Assets.Source(task_align.assets.target.alignment))
task_extract = project.add_task(SamtoolsExtractUnaligned(), assets)
taskset_unaligned.add(task_extract)
# convert to FASTQ
Assets = bamtofastq.Assets
assets = Assets(Assets.Source(task_extract.assets.target.unaligned))
taskset_backtofq.add(project.add_task(bamtofastq, assets))
tsg.add(taskset_unaligned)
tsg.add(taskset_backtofq)
taskgraph = TaskGraph(project)
display(SVG(rsip.svg_tasksetgraph_view(tsg, taskgraph, graph_size="8,6")))
Building a full pipeline¶
Adding additional alternative processing is then only a matter of nesting loops. If we want to expand on our initial consideration to use BWA, say for comparison purposes, and build the same tasks for it is done in very few additional lines of code.
Building blocks for the pipeline¶
Four functions can be written.
1/4: Align the reads¶
def align_reads(task_index, aligner, dataf_fq, project):
# align the reads
taskset_align = easy.TaskSet()
Assets = aligner.Assets
FASTQ = rnaseq.FASTQPossiblyGzipCompressed
for idx, row in dataf_fq.iterrows():
assets_align = Assets(Assets.Source(task_index.assets.target.indexfilepattern,
FASTQ(os.path.join(tmpdir, row['read1_fn'])),
FASTQ(os.path.join(tmpdir, row['read2_fn']))))
task = project.add_task(aligner, assets_align)
taskset_align.add(task)
return taskset_align
2/4: Quantify the alignments¶
def quantify_reads(taskset_align, project, additional_parameters = ()):
# quantify the alignments
htseqcount = env.activities.QUANTIFY.htseqcount
parameters = list(htseqcount._noexons_parameters)
parameters.extend(additional_parameters)
Assets = htseqcount.Assets
taskset_quantify = easy.TaskSet()
for task_align in taskset_align:
assets_quantify = Assets(Assets.Source(task_align.assets.target.alignment,
rnaseq.GFFFile(railroadtracks.model.simulate.PHAGEGFF)))
task_quantify = project.add_task(htseqcount, assets_quantify,
parameters = parameters)
taskset_quantify.add(task_quantify)
return taskset_quantify
3/4: Build a table of counts¶
def build_table_of_counts(taskset_quantify, project):
# merge the alignments into a table of counts
Assets = columnmerger.Assets
filestomerge = tuple(rnaseq.CSVFile(x.assets.target.counts.name) for x in taskset_quantify)
assets = Assets(Assets.Source(rnaseq.CSVFileSequence(filestomerge)))
task_merge = project.add_task(columnmerger, assets, parameters=(0,1))
return task_merge
4/4: Save unaligned reads into FASTQ files¶
def unaligned_to_FASTQ(taskset_align, project):
SamtoolsExtractUnaligned = railroadtracks.model.aligners.SamtoolsExtractUnaligned
bamtofastq = railroadtracks.model.files.BedtoolsBamToFastqPE()
taskset_unaligned = easy.TaskSet()
taskset_backtofq = easy.TaskSet()
for task_align in taskset_align:
# extract unaligned reads
Assets = SamtoolsExtractUnaligned.Assets
assets = Assets(Assets.Source(task_align.assets.target.alignment))
task_extract = project.add_task(SamtoolsExtractUnaligned(), assets)
taskset_unaligned.add(task_extract)
# convert to FASTQ
Assets = bamtofastq.Assets
assets = Assets(Assets.Source(task_extract.assets.target.unaligned))
taskset_backtofq.add(project.add_task(bamtofastq, assets))
return (taskset_unaligned, taskset_backtofq)
Assembly of the pipeline¶
With the four short functions defined above, a workflow that aligns reads with either bowtie2 or BWA and for each aligner saves the unaligned reads into FASTQ files and quantifies the aligments can be written in very few lines of code.
One can note that the workflow is also quite generic. It is only
depending on reference_fasta
, the FASTA file with the reference
genome, and on dataf_fq
, a pandas DataFrame
that contains the
file names for the FASTQ files.
os.mkdir(os.path.join(tmpdir, 'project2'))
project2 = easy.Project(rnaseq,
wd = os.path.join(tmpdir, 'project2'))
# pairs of indexer/aligner
listofindexers = ((env.activities.INDEX.bwaindex, env.activities.ALIGN.bwa),
(env.activities.INDEX.bowtie2build, env.activities.ALIGN.bowtie2))
# additional parameters for the quantifier
quant_params = (('-m', 'union'),
('-m', 'intersection-strict'),
('-m', 'intersection-nonempty'))
tsg2 = TaskSetGraph(defaultmapper=engine)
taskset_index = easy.TaskSet()
# loop through the indexers
for indexer, aligner in listofindexers:
# create assets for the indexer
assets_index = indexer.Assets(indexer.Assets.Source(reference_fasta))
# add to the list of tasks
task_index = project2.add_task(indexer, assets_index)
taskset_index.add(task_index)
# align the reads
taskset_align = align_reads(task_index, aligner, dataf_fq, project2)
tsg2.add(taskset_align)
for additional_parameters in quant_params:
# quantify the alignments
taskset_quantify = quantify_reads(taskset_align, project2,
additional_parameters=additional_parameters)
tsg2.add(taskset_quantify)
# merge the counts into a table
task_merge = build_table_of_counts(taskset_quantify, project2)
tsg2.add(task_merge)
# extract the unaligned read into FASTQ files
taskset_unaligned, taskset_backtofq = unaligned_to_FASTQ(taskset_align, project2)
tsg2.add(taskset_unaligned)
tsg2.add(taskset_backtofq)
# add the set of indexing tasks
# (order of addition to the TaskSetGraph does not matter)
tsg2.add(taskset_index)
Running the pipeline is done as simply as before:
tsg2.execute()
Visualizing the pipeline, together with the grouping of tasks for parallel execution is also done as for simpler pipelines:
taskgraph = TaskGraph(project2)
display(SVG(rsip.svg_tasksetgraph_view(tsg2, taskgraph, graph_size="10,8")))
Handling results (target assets)¶
Results from one task¶
Let’s assume that the table of count is our primary interest.
The Task
“task_merge_into_table” we created earlier for our first
project is all we need.
dataf_counts = pandas.read_csv(task_merge_into_table.assets.target.counts.name, engine="python")
display(dataf_counts)
ID | count | count.1 | count.2 | |
---|---|---|---|---|
0 | GI:146150139 | 99 | 110 | 91 |
1 | GI:146150140 | 25 | 30 | 21 |
2 | GI:146150141 | 0 | 0 | 0 |
3 | GI:146150142 | 142 | 123 | 139 |
4 | __no_feature | 2 | 2 | 4 |
5 | __ambiguous | 32 | 35 | 45 |
6 | __too_low_aQual | 0 | 0 | 0 |
7 | __not_aligned | 0 | 0 | 0 |
8 | __alignment_not_unique | 0 | 0 | 0 |
It is also possible to look for tasks with the help of helping functions
all_merge = project.get_tasksoftype(rnaseq.ColumnMerger)
all_success_merge = all_merge.filter_on_status(easy.TASK_DONE)
# one task left. That's our task_merge
for task in all_success_merge:
display(task)
ID | 13 | |||||||
Info | (5, u'Done', 1434847399.199475) | |||||||
Step | ColumnMerger | |||||||
Source |
|
|||||||
Target |
|
|||||||
Parameters | [0, 1] |
Pre-analysis settings such as fixing sample name is, again, straightforward (the order of is the one in the data frame)
new_names = dict(zip(dataf_counts.columns.values[1:],
tuple(dataf_fq['sample_n'])))
dataf_counts.rename(columns = new_names)
dataf_counts.columns.values[1:] = dataf_fq['sample_n']
display(dataf_counts)
ID | sample_1 | sample_2 | sample_3 | |
---|---|---|---|---|
0 | GI:146150139 | 99 | 110 | 91 |
1 | GI:146150140 | 25 | 30 | 21 |
2 | GI:146150141 | 0 | 0 | 0 |
3 | GI:146150142 | 142 | 123 | 139 |
4 | __no_feature | 2 | 2 | 4 |
5 | __ambiguous | 32 | 35 | 45 |
6 | __too_low_aQual | 0 | 0 | 0 |
7 | __not_aligned | 0 | 0 | 0 |
8 | __alignment_not_unique | 0 | 0 | 0 |
Flattening the provenance is relatively easy with this example. This can be helpful for building a table with information about how any given result was obtained.
# starting with the result of merging the result of quantification into a table of counts
result = tuple(task_merge_into_table.iter_targetassets())[0]
tg = TaskGraph(project)
tasklist = tg.digraph.predecessors(easy.taskgraph.Node('Asset', result.id.id))
taskset = easy.TaskSet()
while len(tasklist) > 0:
t = tasklist.pop()
tasklist.extend(tt for tt in tg.predecessor_tasks(t))
taskset.add(tg.digraph.node[t]['instance'])
set(task.call.step._name for task in taskset)
{'bowtie2', 'bowtie2-build', 'column-merge', 'htseqcount'}
If individual information about the intermediate files is wanted, flattening the graph into a table could be done with the following function:
def provenance_table(taskgraph, task_merge):
result = tuple(task_merge.iter_sourceassets())[0]
node = taskgraph.digraph.node[easy.taskgraph.Node('AssetSequence', result.id.id)]
assetsequence_ids = tuple(node['instance'])
ld = list()
for idx, asset_id in enumerate(assetsequence_ids):
tasklist = taskgraph.digraph.predecessors(easy.taskgraph.Node('Asset', asset_id))
taskset = easy.TaskSet()
d = dict()
d['idx'] = idx
while len(tasklist) > 0:
t = tasklist.pop()
tasklist.extend(tt for tt in taskgraph.predecessor_tasks(t))
this_task = taskgraph.digraph.node[t]['instance']
taskset.add(this_task)
for task in taskset:
label = ' '.join(x.value for x in task.call.step.activities)
d[label + '-taskid'] = task.task_id
d[label] = task.call.step._name
d[label + '-params'] = ' '.join(task.call.parameters)
ld.append(d)
dataf_provenance = pandas.DataFrame.from_records(ld,)
return dataf_provenance
tg = TaskGraph(project)
dataf_provenance = provenance_table(tg, task_merge_into_table)
display(dataf_provenance)
Align | Align-params | Align-taskid | Index | Index-params | Index-taskid | Quantify | Quantify-params | Quantify-taskid | idx | |
---|---|---|---|---|---|---|---|---|---|---|
0 | bowtie2 | 3 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 10 | 0 | ||
1 | bowtie2 | 4 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 11 | 1 | ||
2 | bowtie2 | 5 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 12 | 2 |
Since the order of sample used from the begining is the one in
dataf_fq
, making a table with sample data and provenance is
straightforward:
display(dataf_provenance.join(dataf_fq))
Align | Align-params | Align-taskid | Index | Index-params | Index-taskid | Quantify | Quantify-params | Quantify-taskid | idx | read1_fn | read2_fn | sample_n | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | bowtie2 | 3 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 10 | 0 | sample_1_1.fq | sample_1_2.fq | sample_1 | ||
1 | bowtie2 | 4 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 11 | 1 | sample_2_1.fq | sample_2_2.fq | sample_2 | ||
2 | bowtie2 | 5 | bowtie2-build | 1 | htseqcount | --type=CDS --idattr=db_xref --stranded=no | 12 | 2 | sample_3_1.fq | sample_3_2.fq | sample_3 |
Handling several alternative results¶
Extracting results from alternative processing routes integrated into one pipeline is also quite easy.
First we execute the tasks in our second project (in case we have not done it before - executing the tasks will not be attempted if previously successful).
tsg2.execute()
Build a data frame that contains the results from all alternatives is straightforward:
res = list()
for task in project2.get_tasksoftype(rnaseq.ColumnMerger):
dataf_counts = pandas.read_csv(task.assets.target.counts.name, engine="python")
# add the task ID that produced that table. the task ID can be used to recover anything about the
# task or its provenance.
dataf_counts['task_id'] = pandas.Series(task.task_id, index=dataf_counts.index)
res.append(dataf_counts)
dataf_counts = pandas.concat(res)
Here we observe the difference on the number of reads not mapping to any feature across the alternative pipelines:
criterion = dataf_counts['ID'].map(lambda x: x == '__no_feature')
display(dataf_counts.loc[criterion])
ID | count | count.1 | count.2 | task_id | |
---|---|---|---|---|---|
4 | __no_feature | 2 | 2 | 4 | 8 |
4 | __no_feature | 300 | 300 | 300 | 12 |
4 | __no_feature | 12 | 11 | 14 | 16 |
4 | __no_feature | 2 | 2 | 4 | 30 |
4 | __no_feature | 300 | 300 | 300 | 34 |
4 | __no_feature | 12 | 11 | 14 | 38 |
Linking a task ID to its provenance seen above in the example with one alternative would be similar in this case.
tg = TaskGraph(project2)
res = list()
for task in project2.get_tasksoftype(rnaseq.ColumnMerger):
dataf_provenance = provenance_table(tg, task)
dataf_provenance['task_id'] = pandas.Series(task.task_id, index=dataf_provenance.index)
res.append(dataf_provenance)
dataf_provenance = pandas.concat(res)
dataf_provenance = dataf_provenance[['Align', 'Quantify', "Quantify-params", 'task_id']]
dataf_provenance = dataf_provenance.drop_duplicates()
dataf = pandas.merge(dataf_counts.loc[criterion], dataf_provenance, on='task_id', how='inner')
display(dataf)
ID | count | count.1 | count.2 | task_id | Align | Quantify | Quantify-params | |
---|---|---|---|---|---|---|---|---|
0 | __no_feature | 2 | 2 | 4 | 8 | bwa-align | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m u... |
1 | __no_feature | 300 | 300 | 300 | 12 | bwa-align | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m i... |
2 | __no_feature | 12 | 11 | 14 | 16 | bwa-align | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m i... |
3 | __no_feature | 2 | 2 | 4 | 30 | bowtie2 | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m u... |
4 | __no_feature | 300 | 300 | 300 | 34 | bowtie2 | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m i... |
5 | __no_feature | 12 | 11 | 14 | 38 | bowtie2 | htseqcount | --type=CDS --idattr=db_xref --stranded=no -m i... |