ruffus.Task

ruffus.task – Overview

Decorator syntax:

Pipelined tasks are created by “decorating” a function with the following syntax:

def func_a():
    pass
    
@follows(func_a)
def func_b ():
    pass

Each task is a single function which is applied one or more times to a list of parameters (typically input files to produce a list of output files).

Each of these is a separate, independent job (sharing the same code) which can be run in parallel.

Running the pipeline

To run the pipeline:

pipeline_run(target_tasks, forcedtorun_tasks = [], multiprocess = 1, 
                logger = stderr_logger,
                gnu_make_maximal_rebuild_mode  = True,
                cleanup_log = "../cleanup.log")

pipeline_cleanup(cleanup_log = "../cleanup.log")

Decorators

Task decorators include:

@follows()

@files()

@files_re()

@parallel()

@check_if_uptodate()

@follows

class ruffus.task.follows(parent_task1, "module_X.parent_task2")

@follows (parent_task1, “module_X.parent_task2”)

Takes a list of tasks which have to be run before this function Dependencies can be quoted or unquoted function names. Quoted function names allow dependencies to be added before the function is defined

Functions in other modules need to be fully qualified

For convenience, mkdir can be used to specify directories which need to be created (if they don’t exist) before the task is run.

e.g:

@follows(task_x, mkdir("/output/directory") ...)

@parallel

class ruffus.task.parallel([[job1_params, ...][, job2_params, ...], ...])

@parallel ([[job1_params, ...], [job2_params, ...]...])

@parallel (parameter_generating_func)

The task function will be called iteratively with each set of parameters (possibly in parallel)

No dependency checking is carried out.

Example:

from ruffus import *
parameters = [
                 ['A', 1, 2], # 1st job
                 ['B', 3, 4], # 2nd job
                 ['C', 5, 6], # 3rd job
             ]
@parallel(parameters)                                                     
def parallel_task(name, param1, param2):                                  
    sys.stderr.write("    Parallel task %s: " % name)                     
    sys.stderr.write("%d + %d = %d\n" % (param1, param2, param1 + param2))

pipeline_run([parallel_task])

@files

class ruffus.task.files([[input_files, output_files...], ...])

@files ([[job1.input, job1.output, job1.optional_extra_parameters], ...])

@files (input_file, output_file, optional_extra_parameters)

@files (custom_function)

The first two parameters in each set represent the input and output of the each job. Only out of date jobs will be run. By default, this is by checking input/output file timestamps. (On some file systems, timestamps have a resolution in seconds.)

The input and output files for each job can be
  • A single file name
  • A list of files
  • None

If the input file is None, the job will run if any output file is missing.

If the output file is None, the job will always run.

If any of the output files is missing, the job will run.

If any of the input files is missing when the job is run, a MissingInputFileError exception will be raised.

Example:

from ruffus import *
parameters = [
                    [ 'a.1', 'a.2', 'A file'], # 1st job
                    [ 'b.1', 'b.2', 'B file'], # 2nd job
              ]

@files(parameters)
def parallel_io_task(infile, outfile, text):
    infile_text = open(infile).read()
    f = open(outfile, "w").write(infile_text + "\n" + text)

pipeline_run([parallel_io_task])

Parameters can be generated on the fly as well. Example:

from ruffus import *
def generate_parameters_on_the_fly():
    parameters = [
                        ['input_file1', 'output_file1', 1, 2], # 1st job
                        ['input_file2', 'output_file2', 3, 4], # 2nd job
                        ['input_file3', 'output_file3', 5, 6], # 3rd job
                 ]
    for job_parameters in parameters:
        yield job_parameters

@files(generate_parameters_on_the_fly)
def parallel_io_task(input_file, output_file, param1, param2):
    sys.stderr.write("    Parallel task %s: " % name)
    sys.stderr.write("%d + %d = %d\n" % (param1, param2, param1 + param2))

pipeline_run([parallel_task])

@files_re

class ruffus.task.files_re(glob/file_list, matching_regex, input_file, output_file[, extra_parameters, ...])

@files_re (glob/file_list, matching_regex, output_file)

@files_re (glob/file_list, matching_regex, input_file, output_file, [extra_parameters,...] )

Generates a list of i/o files for each job in the task: Only out of date jobs will be run (See @files).

  1. matching_regex is a python regular expression.
  2. The first parameter are input file(s)
  3. The second parameter are output file(s)

These are used to check if jobs are up to date.

All parameters can be:

  1. None
  2. A string
  3. A sequence of strings
  4. Anything else

Strings and sequences of strings will be treated as regular expression substitution patterns, using matches from matching_regex.

See python regular expression (re) documentation for details of the syntax

None and all other types of objects are passed through unchanged.

Operation:

  1. For each file in the glob (See glob) results or file_list.
  2. Discard all file names those which don’t matching matching_regex
  3. Generate parameters using regular expression pattern substitution

Example:

from ruffus import *
#
#   convert all files ending in ".1" into files ending in ".2"
#
@files_re('*.1', '(.*).1', r'\1.2')
def task_re(infile, outfile):
    open(outfile, "w").write(open(infile).read() + "\nconverted\n")

pipeline_run([task_re])

@check_if_uptodate

class ruffus.task.check_if_uptodate(custom_function)

@check_if_uptodate (dependency_checking_func)

Checks to see if a job is up to date, and needs to be run. dependency_checking_func() needs to handle the same number of parameters as the task function

These two examples, using automatic and manual dependency checking produce the same output. Example 1: Automatic:

from ruffus import *
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
    open(output_file, "w")
            
pipeline_run([create_if_necessary])

Could be rewritten as:: Example 2: Manual:

from ruffus import *
import os
def check_file_exists(input_file, output_file):
    return not os.path.exists(output_file)

@parallel([[None, "a.1"]])
@check_if_uptodate(check_file_exists)
def create_if_necessary(input_file, output_file):
    open(output_file, "w")

pipeline_run([create_if_necessary])

Both produce the same output:

Task = create_if_necessary
    Job = [null, "a.1"] completed

@posttask

class ruffus.task.posttask(custom_function[, touch_file("a.file")])

Calls functions to signal the completion of each task:

from ruffus import *

def task_finished():
    print "hooray"
    
@posttask(task_finished)
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
    open(output_file, "w")
            
pipeline_run([create_if_necessary])

Note

The function(s) provided to @posttask will be called if the ruffus passes through a task, even if none of its jobs are run because they are up-to-date. This happens when a upstream task is out-of-date, and the execution passes through this point in the pipeline

If touch_file is specified, the enclosed files(s) will be touch-ed:

from ruffus import *

@posttask(touch_file("task_completed.flag"))
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
    open(output_file, "w")

pipeline_run([create_if_necessary])

Pipeline functions

pipeline_run

ruffus.task.pipeline_run(target_tasks, forcedtorun_tasks=[], multiprocess=1, logger=stderr_logger, gnu_make_maximal_rebuild_mode=True)

Run pipelines.

Parameters:
  • target_tasks – targets task functions which will be run if they are out-of-date
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date
  • multiprocess – The number of concurrent jobs
  • logger (logging objects) – Where progress will be logged. Defaults to stderr output.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.

pipeline_printout

ruffus.task.pipeline_printout(output_stream, target_tasks, forcedtorun_tasks=[], long_winded=False, indent=4, gnu_make_maximal_rebuild_mode=True, test_all_task_for_update=True)

Printouts the parts of the pipeline which will be run

Because the parameters of some jobs depend on the results of previous tasks, this function produces only the current snap-shot of task jobs. In particular, tasks which generate variable number of inputs into following tasks will not produce the full range of jobs.

Parameters:
  • output_stream (file-like object with write() function) – where to print to
  • target_tasks – targets task functions which will be run if they are out-of-date
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date
  • long_winded – More verbose output
  • indent – How much indentation for pretty format.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.
  • test_all_task_for_update – Ask all task functions if they are up-to-date

pipeline_printout_graph

ruffus.task.pipeline_printout_graph(stream, output_format, target_tasks, forcedtorun_tasks=[], draw_vertically=True, ignore_upstream_of_target=False, skip_uptodate_tasks=False, gnu_make_maximal_rebuild_mode=True, test_all_task_for_update=True, no_key_legend=False)

print out pipeline dependencies in various formats

Parameters:
  • stream (file-like object with write() function) – where to print to
  • output_format – [“dot”, “jpg”, “svg”, “ps”, “png”]. All but the first depends on the dot program.
  • target_tasks – targets task functions which will be run if they are out-of-date.
  • forcedtorun_tasks – task functions which will be run whether or not they are out-of-date.
  • draw_vertically – Top to bottom instead of left to right.
  • ignore_upstream_of_target – Don’t draw upstream tasks of targets.
  • skip_uptodate_tasks – Don’t draw up-to-date tasks if possible.
  • gnu_make_maximal_rebuild_mode – Defaults to re-running all out-of-date tasks. Runs minimal set to build targets if set to True. Use with caution.
  • test_all_task_for_update – Ask all task functions if they are up-to-date.
  • no_key_legend – Don’t draw key/legend for graph.

Logging

class ruffus.task.t_black_hole_logger
Does nothing!
class ruffus.task.t_stderr_logger
Everything to stderr

Implementation:

Parameter factories:

ruffus.task.file_list_io_param_factory(orig_args)
Factory for functions which
yield tuples of input_file_name, output_file_name

Examples of orig_args:

1.:

orig_args = "input1", "output1", any_other_parameters1, ...       # files for job 1 

2.:

orig_args = None,     "output1", any_other_parameters2, ...       # files for job 1 

3.:

orig_args = [                                                               
              ["input0",               "output0",                ...] # files for job 1
              [["input1a", "input1b"], "output1",                ...] # files for job 2 
              ["input2",               ["output2a", "output2b"], ...] # files for job 3 
              ["input3",               "output3",                ...] # files for job 4 
            ]                                                                 

Usage:

param_func = file_list_io_param_factory(orig_args)

for params in param_func():
i,o = params[0:2] print ” input file name = ” , i print “output file name = ” , o

..Note:

1. Each job requires input/output file names
2. Input/output file names can be a string, a list of strings or None
3. Either Input or output file name must be non-None
ruffus.task.glob_regex_io_param_factory(glob_str_or_list, matching_regex, *parameters)
Factory for functions which in turn
yield tuples of input_file_name, output_file_name

Usage:

1.:

param_func = glob_regex_io_param_factory("/etc/*",          # glob                                  
                                         "(file_)(\d+)",    # which match this regex                
                                         "input_file_",   # pattern to generate input file names    
                                         "output_file_")  # pattern to generate output file names  

or 2.:

param_func = glob_regex_io_param_factory("/etc/*",         # glob                                  
                                         "(file_)(\d+)",   # which match this regex                
                                         None,             # use originals as input file names     
                                         "output_file_") # pattern to generate output file names   

or 3.:

param_func = glob_regex_io_param_factory(file_list,        # list of files
                                         "(file_)(\d+)",   # which match this regex                
                                         None,             # use originals as input file names     
                                         "output_file_") # pattern to generate output file names   

for i, o in param_func():                                                                          
    print " input file name = " , i                                                                
    print "output file name = " , o                                                                
..Note::
  1. param_func has to be called each time
  2. glob is called each time. So do not expect the file lists in param_func() to be the same for each invocation
  3. A “copy” of the file list is saved So do not expect to modify your copy of the original list and expect changes to the input/export files

Wrappers around jobs:

ruffus.task.job_wrapper_generic(param, user_defined_work_func, register_cleanup)
run func
ruffus.task.job_wrapper_io_files(param, user_defined_work_func, register_cleanup)
run func on any i/o if not up to date
ruffus.task.job_wrapper_mkdir(param, user_defined_work_func, register_cleanup)
make directories if not exists

Checking if job is update:

ruffus.task.needs_update_check_modify_time(i, o, *other)
Given input and output files
see if all exist and whether output files are later than input files Each can be None, “file1” or [“file1”, “file2”, “file3”] None means always make
ruffus.task.needs_update_check_directory_missing(dirs)
Called per directory:
Does it exist? Is it an ordinary file not a directory? (throw exception

Exceptions and Errors