The XWorkflow library has two main aspects:
A workflow is defined by subclassing the Workflow class, and setting a few specific attributes:
class MyWorkflow(xworkflows.Workflow):
# The states in the workflow
states = (
('init', _(u"Initial state")),
('ready', _(u"Ready")),
('active', _(u"Active")),
('done', _(u"Done")),
('cancelled', _(u"Cancelled")),
)
# The transitions between those states
transitions = (
('prepare', 'init', 'ready'),
('activate', 'ready', 'active'),
('complete', 'active', 'done'),
('cancel', ('ready', 'active'), 'cancelled'),
)
# The initial state of objects using that workflow
initial_state = 'init'
Those attributes will be transformed into similar attributes with friendlier APIs:
The states attribute, a StateList instance, provides a mixed dictionary/object API:
>>> MyWorkflow.states.init
State('init')
>>> MyWorkflow.states.init.title
u"Initial state"
>>> MyWorkflow.states['ready']
State('ready')
>>> 'active' in MyWorkflow.states
True
>>> MyWorkflow.states.init in MyWorkflow.states
True
>>> list(MyWorkflow.states) # definition order is kept
[State('init'), State('ready'), State('active'), State('done'), State('cancelled')]
The transitions attribute of a Workflow is a TransitionList instance, exposing a mixed dictionary/object API:
>>> MyWorkflow.transitions.prepare
Transition('prepare', [State('init')], State('ready'))
>>> MyWorkflow.transitions['cancel']
Transition('cancel', [State('ready'), State('actuve')], State('cancelled'))
>>> 'activate' in MyWorkflow.transitions
True
>>> MyWorkflow.transitions.available_from(MyWorkflow.states.ready)
[Transition('activate'), Transition('cancel')]
>>> list(MyWorkflow.transitions) # Definition order is kept
[Transition('prepare'), Transition('activate'), Transition('complete'), Transition('cancel')]
The process to apply a Workflow to an object is quite straightforward:
These attributes will be transformed into StateProperty objects, acting as a wrapper around the State held in the object’s internal __dict__.
For each transition of each related Workflow, the WorkflowEnabledMeta metaclass will add or enhance a method for each transition, according to the following rules:
For a WorkflowEnabled object, each <attr> = SomeWorkflow() definition is translated into a StateProperty object, which adds a few functions to a plain attribute:
It checks that any value set is a valid State from the related Workflow:
>>> obj = MyObject()
>>> obj.state = State('foo')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: Value State('foo') is not a valid state for workflow MyWorkflow.
It defaults to the initial_state of the Workflow if no value was set:
>>> obj = MyObject()
>>> obj.state
State('init')
It wraps retrieved values into a StateWrapper, which adds a few extra attributes:
Access to the related workflow:
>>> obj.state.workflow
<Workflow: MyWorkflow>
List of accessible transitions:
>>> obj.state.transitions
[Transition('accept')]
Easy testing of the current value:
>>> obj.state.is_init
True
>>> obj.state.is_ready
False
Native equivalence to the state's name:
>>> obj.state == 'init'
True
>>> obj.state == 'ready'
False
>>> obj.state in ['init', 'ready']
True
Note
This behavior should only be used when accessing the State objects from the Workflow.states list is impossible, e.g comparison with external data (URL, database, ...).
Using State objects or the is_XXX attributes protects from typos in the code (AttributeError would be raised), whereas raw strings provide no such guarantee.
In order to link a state change with specific code, a WorkflowEnabled object must simply have a method decorated with the transition() decorator.
If that method cannot be defined with the name of the related Transition, the name of that Transition should be passed as first argument to the transition() decorator:
class MyObject(xworkflows.WorkflowEnabled):
state = MyWorkflow()
@xworkflows.transition()
def accept(self):
pass
@xworkflows.transition('cancel')
def do_cancel(self):
pass
Once decorated, any call to that method will perfom the following steps:
Transitions for which no implementation was defined will have a basic noop() implementation.
According to the order above, preventing a State change can be done:
Additional control over the transition implementation can be obtained via hooks. 5 kinds of hooks exist:
The hook decorators all accept the following arguments:
A list of Transition names (for transition-related hooks) or State names (for state-related hooks); if empty, the hook will apply to all transitions:
@xworkflows.before_transition()
@xworkflows.after_transition('foo', 'bar')
def hook(self, *args, **kwargs):
pass
As a keyword field= argument, the name of the field whose transitions the hook applies to (when an instance uses more than one workflow):
class MyObject(xworkflows.WorkflowEnabled):
state1 = SomeWorkflow()
state2 = AnotherWorkflow()
@xworkflows.on_enter_state(field='state2')
def hook(self, res, *args, **kwargs):
# Only called for transitions on state2.
pass
As a keyword priority= argument (default: 0), the priority of the hook; hooks are applied in decreasing priority order:
class MyObject(xworkflows.WorkflowEnabled):
state = SomeWorkflow()
@xworkflows.before_transition('*', priority=-1)
def last_hook(self, *args, **kwargs):
# Will be called last
pass
@xworkflows.before_transition('foo', priority=10)
def first_hook(self, *args, **kwargs):
# Will be called first
pass
Hook decorators can also be stacked, in order to express compless hooking systems:
@xworkflows.before_transition('foobar', priority=4)
@xworkflows.on_leave_state('baz')
def hook(self, *args, **kwargs):
pass
The order in which hooks are applied is computed based on the following rules:
Sort that list from higher to lower priority, and in alphabetical order if priority match
In the following code snippet, the order is hook3, hook1, hook4, hook2:
@xworkflows.before_transition()
def hook1(self):
pass
@xworkflows.before_transition(priority=-1)
def hook2(self):
pass
@xworkflows.before_transition(priority=10)
def hook3(self):
pass
@xworkflows.on_leave_state()
def hook4(self):
pass
Hooks can also be bound to the implementation at the transition() level:
@xworkflows.transition(check=some_fun, before=other_fun, after=something_else)
def accept(self):
pass
Deprecated since version 0.4.0: Use before_transition(), after_transition() and transition_check() instead; will be removed in 0.5.0.
The old behaviour did not allow for hook overriding in inherited workflows.
Some programs may need to display available transitions, without calling them. Instead of checking manually the state of the object and calling the appropriate transition_check() hooks if defined, you should simply call myobj.some_transition.is_available():
class MyObject(WorkflowEnabled):
state = MyWorkflow
x = 13
@transition_check('accept')
def check(self):
return self.x == 42
def accept(self):
pass
@transition()
def cancel(self):
pass
>>> obj = MyObject()
>>> obj.accept.is_available() # Forbidden by 'check'
False
>>> obj.cancel.is_available() # Forbidden by current state
False
>>> obj.x = 42
>>> obj.accept.is_available()
True
The log_transition() method of a Workflow allows logging each Transition performed by an object using that Workflow.
This method is called with the following arguments:
The default implementation logs (with the logging module) to the xworkflows.transitions logger.
This behaviour can be overridden on a per-workflow basis: simply override the Workflow.log_transition() method.
In order to perform advanced tasks when running transitions, libraries may hook directly at the ImplementationWrapper level.
For this, custom Workflow classes should override the Workflow.implementation_class attribute with their custom subclass and add extra behaviour there.
Possible customizations would be: