Cookbook

Environment

Checking that 3rd-party executables are present

When writing a recipe, checking that 3rd-party executables are present can be highly desirable in order to let a script fail early and provide informative message to a user about what is required but missing.

from railroadtracks.environment import Executable, MissingSoftware

if not Executable.ispresent('bwa'):
    raise MissingSoftware('The bwa is not in the PATH.')

Project

Get all tasks of a given type

The types, or classes, defined in a model can be used to query a project.

taskset = project.get_tasksoftype(rnaseq.StarAlign)

Get all tasks matching a list of types

listoftypes = (rnaseq.StarAlign, rnaseq.BWA)
taskset = reduce(lambda x,y: x.union(project.get_tasksoftype(y)),
                 listoftypes, easy.TaskSet())

Get all tasks fulfilling a given activity

taskset = project.get_taskswithactivity(rnaseq.ACTIVITY.ALIGN)

TaskSet

TaskSet objects provide a convenient abstraction to manipulate groups of tasks by thinking about them as set objects. In addition to set methods, they also have railroadtracks specific features.

Subset all tasks with a given status

Retrieving all tasks with a given status from a TaskSet can be achieved with the method TaskSet.filter_on_status().

For example:

# all tasks in the set that failed
taskset_failed = taskset.filter_on_status(hortator._TASK_FAILED))

# all tasks in the set that succeeded
taskset_success = taskset.filter_on_status(hortator._TASK_TODO))

Subset all tasks ready for execution

Tasks ready for execution have all their /parent/ tasks with a status _TASK_DONE and have themselves the status _TASK_TODO or _TASK_FAILED.

This is implemented very simply, using TaskSet.filter_on_status(), TaskSet.union(), and TaskSet.filter_on_parent_status().

def exec_taskfilter(taskset):
    # union of tasks that are either "TO-DO" or "FAILED"
    tasks_todo = taskset.filter_on_status(hortator._TASK_TODO)
    tasks_todo = tasks_todo.union(taskset.filter_on_status(hortator._TASK_FAILED))
    # only keep the tasks for which the parent task was successfully run
    tasks_todo = tasks_todo.filter_on_parent_status(hortator._TASK_DONE)
    return tasks_todo

That function is part of the code base, so in practice one will only have to write:

import railroadtracks.easy.execution
ts_torun = railroadtracks.easy.execution.exec_taskfilter(taskset)

Get all child tasks for a TaskSet

Each TaskSet can be manipulated as a set, so if we think of the problem as the union of the child taks for each task in the set it writes simply as:

ts_allchilds = easy.TaskSet()
for task in taskset:
    ts_allchilds.union(task.child_tasks())

The one-line version is:

ts_allchilds = reduce(lambda x,y: x.union(y),
                      map(lambda x: x.child_tasks(), taskset),
                      easy.TaskSet())

TaskSetGraph

TaskSet objects are rarely only considered in isolation since they are part of a larger dependency graph. TaskSetGraph helps handling all the tasks by allowing one to manipulate groups of tasks not connected directly as TaskSet objects and in the context of the task-level dependency graph. Such sets can be added directly to a TaskSetGraph while dependencies between the task sets are inferred from the task-level relationship.

from railroadtracks.easy.tasks import TaskSet
from railroadtracks.easy.tasksetgraph import TaskSetGraph

tsg = TaskSetGraph()

# add a taskset
tsg.add(taskset_a)

# add an other taskset
tsg.add(taskset_b)

Execute all tasks

In order to execute tasks, one can decide on a default task mapper (task mappers can also be set at the TaskSet level) and on a task filter (a function that decides which tasks should be executed). The order in which TaskSet object should be executed is inferred from the connections between the Tasks in the TaskSet and does not require a user’s intervention. The task filter provides an additional level of refinement by letting a function control the tasks in the set that are executed.

In the example below, we are using easy.execution.IterativeExecution to map the tasks, and the filter introduced earlier in the section TaskSet: only consider for execution the tasks that are not yet done, and for which the /parent/ tasks were successfully completed (no point trying to execute a task if its parent has not been succesful).

from railroadtracks.easy.processing import IterativeExecution
# task mapper to run the tasks
p = IterativeExecution()
tsg.defaultmapper = p

# task filter
tsg.defaultfilter = easy.execute.exec_taskfilter

# execute all tasks that pass the filter
tsg.execute()