Invenio-Workflows

Invenio module for running determined set of tasks.

This is an experimental development preview release.

Contents:

API

This module allows you to easily push your data through a determined set of tasks and stop/continue execution if necessary.

Create a workflow

Create a workflow for your data using functions as individual tasks.

from invenio_workflows.tasks.sample_tasks import (
    add_data,
    halt_if_higher_than_20,
)

class myworkflow(object):
    \"\"\"Add 20 to integer and halt if higher.\"\"\"
    workflow = [add_data(20),
                halt_if_higher_than_20]

Save it as a new file in your module located under workflows/ with the same name as the class. For example at yourmodule/workflows/myworkflow.py.

The workflow attribute should be a list of functions (or list of lists of tasks) as per the conditions of the underlying workflows-module.

Create a task

The functions in the workflow are called tasks. Each task must at least take two arguments:

def halt_if_higher_than_20(obj, eng):
    \"\"\"Check if current data is more than than 20.\"\"\"
    if obj.data > 20:
        eng.halt("Data higher than 20.")
obj (models.BibWorkflowObject)

is the current object being worked on

obj adds extra functionality by wrapping around your data and provide utilities to interface with the Holding Pen interface.

eng (engine.BibWorkflowEngine)

is the current instance of the workflow engine

eng give you access to manipulating the workflow execution itself and to retrieve all the objects being processed.

Other parameters may be passed as *args or **kwargs.

Pass additional arguments

To allow arguments being passed to the task from the workflow definition, simply wrap your task in a closure:

def add_data(data_param):
    \"\"\"Add data_param to the obj.data.\"\"\"
    def _add_data(obj, eng):
        data = data_param
        obj.data += data

    return _add_data

It can then be called from the workflow definition as add_data(20), returning the inner function.

Run a workflow

Finally, to run your workflow you there are mainly two use-cases:

  • run only a single data object, or
  • run multiple data objects through a workflow.

The former use the models.BibWorkflowObject model API, and the latter use the api.

Run a single data object

Note

This method is recommended if you only have one data item you want to run through the workflow.

from invenio_workflows.models import BibWorkflowObject
myobj = BibWorkflowObject.create_object()
myobj.set_data(10)
eng = myobj.start_workflow("myworkflow")

Once the workflow completes it will return the engine instance that ran it.

To get the data, simply call the get_data() function of models.BibWorkflowObject

myobj.get_data()  # outputs: 30

Run multiple data objects

Note

This method is recommended if you need to run several objects through a workflow.

To do this simply import the workflows API function start() and provide a list of objects:

from invenio_workflows.api import start
eng = start(workflow_name="myworkflow", data=[5, 10])

Here we are passing simple data objects in form integers.

As usual, the start() function returns the eng instance that ran the workflow. You can query this object to retrieve the data you sent in:

len(eng.objects)  # outputs: 4

Why 4 objects when we only shipped 2 objects? Well, we take initial snapshots (copy of BibWorkflowObject) of the original data. In the example above, we get 4 objects back as each object passed have a snapshot created.

You can also query the engine instance to only give you the objects which are in a certain state.

len(eng.initial_objects)  # outputs: 2

len(eng.halted_objects)  # outputs: 2

len(eng.completed_objects)  # outputs: 0

len(eng.running_objects)  # outputs: 0

len(eng.waiting_objects)  # outputs: 0

len(eng.error_objects)  # outputs: 0

(eng.completed_objects is empty because both objects passed is halted.)

This output is actually representative of snapshots of the objects, not the objects themselves. The _default_ snapshotting behaviour is also evident here: There is one snapshot taken in the beginning of the execution and one when the object reaches one of the other states. A snapshot can only be in a single state.

No object will ever be in the running state under usual operation.

Moreover, to retrieve the data from the first object, you can use get_data() as with single objects:

res = halted_objects[0].get_data()
print res
# outputs: 25

Run workflows asynchronously

So far we have in been running our workflows in the current process. However, for long running processes we do not want to wait for the workflow to finish before continuing the processing.

Luckily, there is API to do this:

BibWorkflowObject.start_workflow(delayed=True)
as when running single objects, you can pass the delayed parameter to enable asynchronous execution.
api.start_delayed()
The API provide this function start_delayed() to run a workflow asynchronously.

To use this functionality you need to make sure you are running a task queue such as Celery that will run the workflow in a separate process.

Note

The delayed API returns a worker_result.AsynchronousResultWrapper instead of a engine.BibWorkflowEngine instance.

In order to communicate with such a task queue we make use of worker plugins.

Workers

A worker is a plugin (or bridge) from the Invenio workflows module to some distributed task queue. By default, we have provided workers for Celery and RQ.

These plugins are used by the worker_engine to launch workflows asynchronously in a task queue.

We recommend to use Celery as the default asynchronous worker.

Working with extra data

If you need to add some extra data to the models.BibWorkflowObject that is not suitable to add to the obj.data attribute, you can make use if the obj.extra_data attribute.

The extra_data attribute is basically a normal dictionary that you can fill. However it contains some additional information by default.

{
    "_tasks_results": {},
    "owner": {},
    "_task_counter": {},
    "_error_msg": None,
    "_last_task_name": "",
    "latest_object": -1,
    "_action": None,
    "redis_search": {},
    "source": "",
    "_task_history: [],
}

This information is used by the models.BibWorkflowObject to store some additional data related to the workflow execution and additional data added by tasks.

It also stores information that is integrated with Holding Pen - the graphical interface for all the data objects.

Holding Pen

The graphical interface over all the data objects that have been executed in a workflow.

The name Holding Pen originates from a library use case of having some incoming bibliographical meta-data records on “hold” - awaiting some human curator to analyze the record and decide if the record should be inserted into the repository.

One common usage of this interface is acceptance of user submissions.

We will take this concept of record approval further throughout this guide as we explain the most common use cases for the Holding Pen.

Note

The Holding Pen is accessible under /admin/holdingpen

Data object display in Holding Pen

To properly represent a data objects in the Holding Pen, the workflow definition explained above can be further enriched by adding some static functions to the class.

  • get_title: return the “title” of the data object shown in the table display.

    E.g. title of meta-data record

  • get_description: return a short desciption of the data object shown in the table display.

    E.g. identifiers and categories

  • formatter: used in the object detailed display to render the data in the object for the user.

    E.g. the detailed record format of a meta-data record.

Actions in Holding Pen

An action in Holding Pen is a generic term describing an action that can be taken on a data object.

To use the example of record approval, we basically mean adding GUI buttons to accept or reject a data object. The action taken (the button pressed) on the data object is then connected to a custom action back-end that may then decide to e.g. continue the workflow or simply delete the object.

Adding an action

By default we have added an approval action which can be used to allow a data object to continue the workflow or be deleted.

workflows/actions/approval.py
Action back-end located in workflows/actions/approval.py that implements render(), render_mini() and resolve(). resolve() handles the continuation or deletion of the data object using the workflows API.
templates/workflows/actions/approval_(main|mini|side).html

jinja templates used to render the action UI. There are different templates in play here depending on position.

  • mini: displayed in the main table (for a quick action).
  • side: displayed on the right side of the object details page.
  • main: displayed in the middle of the object details page.
static/workflows/actions/approval.js
JavaScript file listening to events in the approval UI to call the backend via ajax calls.

To enable the JavaScript to be loaded via requireJS, you need to override the actions “init” JavaScript file static/workflows/actions/init.js on your overlay and add any initialization code for the action (e.g. attaching events).

Using an action

There are two ways of activating an action:

  • When halting a workflow: engine.BibWorkflowEngine.halt() has a parameter that allows you to set an action that needs to be taken in the Holding Pen - along with a message to be displayed.
  • Directly using the :py:class:`.models.BibWorkflowObject` API. models.BibWorkflowObject.set_action() models.BibWorkflowObject.remove_action() models.BibWorkflowObject.get_action().

Task results in Holding Pen

If you want to add some special task results to be displayed on the details page of the data object in Holding Pen, you can use the task results API available in models.BibWorkflowObject.

The API provides functions to manipulate the task results:

models.BibWorkflowObject.add_task_result()
Adds a task result to the end of a list associated with a label (name).
models.BibWorkflowObject.update_task_results()
Update task result for a specific label (name).
models.BibWorkflowObject.get_tasks_results()
Return all tasks results as a dictionary as { name: [result, ..] }

The task result is a dictionary given as context to the template when rendered. The result given here is added to a list of results for this name.

obj = BibWorkflowObject()  # or BibWorkflowObject.query.get(id)
obj.add_task_result("foo", my_result, "path/to/template")

See sample templates under templates/workflows/results/*.html.

Main API for the workflows.

If you want to run a workflow using the workflows module, this is the high level API you will want to use.

class invenio_workflows.api.WorkerBackend

WorkerBackend is a class representing the worker.

It will automatically get the worker thanks to the configuration when called.

worker

Represent the worker.

This cached property is the one which is returning the worker to use.

Returns:the worker configured into the configuration file.
invenio_workflows.api.continue_oid(oid, start_point='continue_next', **kwargs)

Continue workflow for given object id (oid).

Depending on start_point it may start from previous, current or next task.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:
  • oid (str) – id of BibWorkflowObject to run.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task
Returns:

BibWorkflowEngine that ran the workflow

invenio_workflows.api.continue_oid_delayed(oid, start_point='continue_next', **kwargs)

Continue workflow for given object id (oid), asynchronously.

Similar behavior as continue_oid(), except it runs it asynchronously.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:
  • oid (str) – id of BibWorkflowObject to run.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task
Returns:

AsynchronousResultWrapper.

invenio_workflows.api.resume_objects_in_workflow(id_workflow, start_point='continue_next', **kwargs)

Resume workflow for any halted or failed objects from given workflow.

This is a generator function and will yield every workflow created per object which needs to be resumed.

To identify the original workflow containing the halted objects, the ID (or UUID) of the workflow is required. The starting point to resume the objects from can optionally be given. By default, the objects resume with their next task in the workflow.

Parameters:
  • id_workflow (str) – id of Workflow with objects to resume.
  • start_point (str) – where should the workflow start from? One of: * restart_prev: will restart from the previous task * continue_next: will continue to the next task * restart_task: will restart the current task

yield: BibWorkflowEngine that ran the workflow

invenio_workflows.api.start(workflow_name, data, **kwargs)

Start a workflow by given name for specified data.

The name of the workflow to start is considered unique and it is equal to the name of a file containing the workflow definition.

The data passed should be a list of object(s) to run through the workflow. For example: a list of dict, JSON string, BibWorkflowObjects etc.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a taskid from BibSched, the current user etc.

The workflow engine object generated is returned upon completion.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • data (list) – the workflow name to run. Ex: “my_workflow”.
Returns:

BibWorkflowEngine that ran the workflow.

invenio_workflows.api.start_by_oids(workflow_name, oids, **kwargs)

Start workflow by name with models.BibWorkflowObject ids.

Wrapper to call start() with list of models.BibWorkflowObject ids.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • oids (list) – BibWorkflowObject id’s to run.
Returns:

BibWorkflowEngine that ran the workflow.

invenio_workflows.api.start_by_oids_delayed(workflow_name, oids, **kwargs)

Start asynchronously workflow by name with models.BibWorkflowObject ids.

Similar behavior as start_by_oids(), except it calls start_delayed().

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • oids (list) – list of BibWorkflowObject id’s to run.
Returns:

AsynchronousResultWrapper.

invenio_workflows.api.start_by_wid(wid, **kwargs)

Re-start given workflow, by workflow uuid (wid).

It is restarted from the beginning with the original data given.

Special custom keyword arguments can be given to the workflow engine in order to pass certain variables to the tasks in the workflow execution, such as a task-id from BibSched, the current user etc.

Parameters:wid (str) – the workflow uuid. Ex: “550e8400-e29b-41d4-a716-446655440000”.
Returns:BibWorkflowEngine that ran the workflow.
invenio_workflows.api.start_by_wid_delayed(wid, **kwargs)

Re-start given workflow, by workflow uuid (wid), asynchronously.

Similar behavior as start_by_wid(), except it starts the workflow delayed by using one of the defined workers available.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the function AsynchronousResultWrapper.get.

Parameters:wid (str) – the workflow uuid. Ex: “550e8400-e29b-41d4-a716-446655440000”.
Returns:AsynchronousResultWrapper
invenio_workflows.api.start_delayed(workflow_name, data, **kwargs)

Start a workflow by given name for specified data, asynchronously.

Similar behavior as start(), except it starts the workflow delayed by using one of the defined workers available.

For example, it may enqueue the execution of the workflow in a task queue such as Celery (http://celeryproject.org).

This function returns a sub-classed AsynchronousResultWrapper that holds a reference to the workflow id via the object AsynchronousResultWrapper.get.

Parameters:
  • workflow_name (str) – the workflow name to run. Ex: “my_workflow”.
  • data (list) – the workflow name to run. Ex: “my_workflow”.
Returns:

AsynchronousResultWrapper

Models

Engine

Workers

Contain the AsynchronousResultWrapper class for asynchronous execution.

class invenio_workflows.worker_result.AsynchronousResultWrapper(asynchronousresult)

Wrap results from asynchronous results.

This class is an abstract class. When you inherit it you should absolutely implement all the functions.

This class is here for two reason, get and unified interface for all the worker and so allow to switch from one to another seamlessly, and also add feature to functions.

For example the get method now allow a post processing on the result.

get(postprocess=None)

Return the value of the process.

status()

Return the current status of the tasks.

invenio_workflows.worker_result.uuid_to_workflow(uuid)

Return the workflow associated to an uuid.

Views

Indices and tables