Pipelined tasks are created by “decorating” a function with the following syntax:
def func_a(): pass @follows(func_a) def func_b (): passEach task is a single function which is applied one or more times to a list of parameters (typically input files to produce a list of output files).
Each of these is a separate, independent job (sharing the same code) which can be run in parallel.
To run the pipeline:
pipeline_run(target_tasks, forcedtorun_tasks = [], multiprocess = 1, logger = stderr_logger, gnu_make_maximal_rebuild_mode = True, cleanup_log = "../cleanup.log") pipeline_cleanup(cleanup_log = "../cleanup.log")
Task decorators include:
@follows (parent_task1, “module_X.parent_task2”)
Takes a list of tasks which have to be run before this function Dependencies can be quoted or unquoted function names. Quoted function names allow dependencies to be added before the function is defined
Functions in other modules need to be fully qualified
For convenience, mkdir can be used to specify directories which need to be created (if they don’t exist) before the task is run.
e.g:
@follows(task_x, mkdir("/output/directory") ...)
@parallel ([[job1_params, ...], [job2_params, ...]...])
@parallel (parameter_generating_func)
The task function will be called iteratively with each set of parameters (possibly in parallel)
No dependency checking is carried out.
Example:
from ruffus import * parameters = [ ['A', 1, 2], # 1st job ['B', 3, 4], # 2nd job ['C', 5, 6], # 3rd job ] @parallel(parameters) def parallel_task(name, param1, param2): sys.stderr.write(" Parallel task %s: " % name) sys.stderr.write("%d + %d = %d\n" % (param1, param2, param1 + param2)) pipeline_run([parallel_task])
@files ([[job1.input, job1.output, job1.optional_extra_parameters], ...])
@files (input_file, output_file, optional_extra_parameters)
@files (custom_function)
The first two parameters in each set represent the input and output of the each job. Only out of date jobs will be run. By default, this is by checking input/output file timestamps. (On some file systems, timestamps have a resolution in seconds.)
If the input file is None, the job will run if any output file is missing.
If the output file is None, the job will always run.
If any of the output files is missing, the job will run.
If any of the input files is missing when the job is run, a MissingInputFileError exception will be raised.
Example:
from ruffus import *
parameters = [
[ 'a.1', 'a.2', 'A file'], # 1st job
[ 'b.1', 'b.2', 'B file'], # 2nd job
]
@files(parameters)
def parallel_io_task(infile, outfile, text):
infile_text = open(infile).read()
f = open(outfile, "w").write(infile_text + "\n" + text)
pipeline_run([parallel_io_task])
Parameters can be generated on the fly as well. Example:
from ruffus import *
def generate_parameters_on_the_fly():
parameters = [
['input_file1', 'output_file1', 1, 2], # 1st job
['input_file2', 'output_file2', 3, 4], # 2nd job
['input_file3', 'output_file3', 5, 6], # 3rd job
]
for job_parameters in parameters:
yield job_parameters
@files(generate_parameters_on_the_fly)
def parallel_io_task(input_file, output_file, param1, param2):
sys.stderr.write(" Parallel task %s: " % name)
sys.stderr.write("%d + %d = %d\n" % (param1, param2, param1 + param2))
pipeline_run([parallel_task])
@files_re (glob/file_list, matching_regex, output_file)
@files_re (glob/file_list, matching_regex, input_file, output_file, [extra_parameters,...] )
Generates a list of i/o files for each job in the task: Only out of date jobs will be run (See @files).
These are used to check if jobs are up to date.
All parameters can be:
- None
- A string
- A sequence of strings
- Anything else
Strings and sequences of strings will be treated as regular expression substitution patterns, using matches from matching_regex.
See python regular expression (re) documentation for details of the syntax
None and all other types of objects are passed through unchanged.
Operation:
- For each file in the glob (See glob) results or file_list.
- Discard all file names those which don’t matching matching_regex
- Generate parameters using regular expression pattern substitution
Example:
from ruffus import *
#
# convert all files ending in ".1" into files ending in ".2"
#
@files_re('*.1', '(.*).1', r'\1.2')
def task_re(infile, outfile):
open(outfile, "w").write(open(infile).read() + "\nconverted\n")
pipeline_run([task_re])
@check_if_uptodate (dependency_checking_func)
Checks to see if a job is up to date, and needs to be run. dependency_checking_func() needs to handle the same number of parameters as the task function
These two examples, using automatic and manual dependency checking produce the same output. Example 1: Automatic:
from ruffus import *
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
open(output_file, "w")
pipeline_run([create_if_necessary])
Could be rewritten as:: Example 2: Manual:
from ruffus import *
import os
def check_file_exists(input_file, output_file):
return not os.path.exists(output_file)
@parallel([[None, "a.1"]])
@check_if_uptodate(check_file_exists)
def create_if_necessary(input_file, output_file):
open(output_file, "w")
pipeline_run([create_if_necessary])
Both produce the same output:
Task = create_if_necessary
Job = [null, "a.1"] completed
Calls functions to signal the completion of each task:
from ruffus import *
def task_finished():
print "hooray"
@posttask(task_finished)
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
open(output_file, "w")
pipeline_run([create_if_necessary])
Note
The function(s) provided to @posttask will be called if the ruffus passes through a task, even if none of its jobs are run because they are up-to-date. This happens when a upstream task is out-of-date, and the execution passes through this point in the pipeline
If touch_file is specified, the enclosed files(s) will be touch-ed:
from ruffus import *
@posttask(touch_file("task_completed.flag"))
@files(None, "a.1")
def create_if_necessary(input_file, output_file):
open(output_file, "w")
pipeline_run([create_if_necessary])
Run pipelines.
Parameters: |
|
---|
Printouts the parts of the pipeline which will be run
Because the parameters of some jobs depend on the results of previous tasks, this function produces only the current snap-shot of task jobs. In particular, tasks which generate variable number of inputs into following tasks will not produce the full range of jobs.
Parameters: |
|
---|
print out pipeline dependencies in various formats
Parameters: |
|
---|
Examples of orig_args:
1.:
orig_args = "input1", "output1", any_other_parameters1, ... # files for job 1
2.:
orig_args = None, "output1", any_other_parameters2, ... # files for job 1
3.:
orig_args = [
["input0", "output0", ...] # files for job 1
[["input1a", "input1b"], "output1", ...] # files for job 2
["input2", ["output2a", "output2b"], ...] # files for job 3
["input3", "output3", ...] # files for job 4
]
Usage:
param_func = file_list_io_param_factory(orig_args)
- for params in param_func():
- i,o = params[0:2] print ” input file name = ” , i print “output file name = ” , o
..Note:
1. Each job requires input/output file names
2. Input/output file names can be a string, a list of strings or None
3. Either Input or output file name must be non-None
Usage:
1.:
param_func = glob_regex_io_param_factory("/etc/*", # glob
"(file_)(\d+)", # which match this regex
"input_file_", # pattern to generate input file names
"output_file_") # pattern to generate output file names
or 2.:
param_func = glob_regex_io_param_factory("/etc/*", # glob
"(file_)(\d+)", # which match this regex
None, # use originals as input file names
"output_file_") # pattern to generate output file names
or 3.:
param_func = glob_regex_io_param_factory(file_list, # list of files
"(file_)(\d+)", # which match this regex
None, # use originals as input file names
"output_file_") # pattern to generate output file names
for i, o in param_func():
print " input file name = " , i
print "output file name = " , o