Cookbook¶
Environment¶
Checking that 3rd-party executables are present¶
When writing a recipe, checking that 3rd-party executables are present can be highly desirable in order to let a script fail early and provide informative message to a user about what is required but missing.
from railroadtracks.environment import Executable, MissingSoftware
if not Executable.ispresent('bwa'):
raise MissingSoftware('The bwa is not in the PATH.')
Project¶
Get all tasks of a given type¶
The types, or classes, defined in a model can be used to query a project.
taskset = project.get_tasksoftype(rnaseq.StarAlign)
Get all tasks matching a list of types¶
listoftypes = (rnaseq.StarAlign, rnaseq.BWA)
taskset = reduce(lambda x,y: x.union(project.get_tasksoftype(y)),
listoftypes, easy.TaskSet())
Get all tasks fulfilling a given activity¶
taskset = project.get_taskswithactivity(rnaseq.ACTIVITY.ALIGN)
TaskSet¶
TaskSet
objects provide a convenient abstraction to manipulate groups of tasks by thinking about them
as set
objects. In addition to set
methods, they also have railroadtracks
specific features.
Subset all tasks with a given status¶
Retrieving all tasks with a given status from a TaskSet
can be achieved with the method
TaskSet.filter_on_status()
.
For example:
# all tasks in the set that failed
taskset_failed = taskset.filter_on_status(hortator._TASK_FAILED))
# all tasks in the set that succeeded
taskset_success = taskset.filter_on_status(hortator._TASK_TODO))
Subset all tasks ready for execution¶
Tasks ready for execution have all their /parent/ tasks with a status _TASK_DONE and have themselves the status _TASK_TODO or _TASK_FAILED.
This is implemented very simply, using TaskSet.filter_on_status()
, TaskSet.union()
,
and TaskSet.filter_on_parent_status()
.
def exec_taskfilter(taskset):
# union of tasks that are either "TO-DO" or "FAILED"
tasks_todo = taskset.filter_on_status(hortator._TASK_TODO)
tasks_todo = tasks_todo.union(taskset.filter_on_status(hortator._TASK_FAILED))
# only keep the tasks for which the parent task was successfully run
tasks_todo = tasks_todo.filter_on_parent_status(hortator._TASK_DONE)
return tasks_todo
That function is part of the code base, so in practice one will only have to write:
import railroadtracks.easy.execution
ts_torun = railroadtracks.easy.execution.exec_taskfilter(taskset)
Get all child tasks for a TaskSet
¶
Each TaskSet
can be manipulated as a set
, so if we think of the problem
as the union of the child taks for each task in the set it writes simply as:
ts_allchilds = easy.TaskSet()
for task in taskset:
ts_allchilds.union(task.child_tasks())
The one-line version is:
ts_allchilds = reduce(lambda x,y: x.union(y),
map(lambda x: x.child_tasks(), taskset),
easy.TaskSet())
TaskSetGraph¶
TaskSet
objects are rarely only considered in isolation since they are part of a larger
dependency graph. TaskSetGraph
helps handling all the tasks by allowing one to manipulate
groups of tasks not connected directly as TaskSet
objects and in the context
of the task-level dependency graph. Such sets can be added directly to a TaskSetGraph
while dependencies between the task sets are inferred from the task-level relationship.
from railroadtracks.easy.tasks import TaskSet
from railroadtracks.easy.tasksetgraph import TaskSetGraph
tsg = TaskSetGraph()
# add a taskset
tsg.add(taskset_a)
# add an other taskset
tsg.add(taskset_b)
Execute all tasks¶
In order to execute tasks, one can decide on a default task mapper (task mappers can also be set at the
TaskSet
level) and on a task filter (a function that decides which tasks should be executed).
The order in which TaskSet
object should be executed is inferred from the connections between
the Tasks
in the TaskSet
and does not require a user’s intervention. The task filter
provides an additional level of refinement by letting a function control the tasks in the set that are
executed.
In the example below, we are using easy.execution.IterativeExecution
to map the tasks, and
the filter introduced earlier in the section TaskSet: only consider for execution the tasks that are
not yet done, and for which the /parent/ tasks were successfully completed (no point trying to
execute a task if its parent has not been succesful).
from railroadtracks.easy.processing import IterativeExecution
# task mapper to run the tasks
p = IterativeExecution()
tsg.defaultmapper = p
# task filter
tsg.defaultfilter = easy.execute.exec_taskfilter
# execute all tasks that pass the filter
tsg.execute()